Skip to content

Commit

Permalink
Remove bad advice from Kafka system tests (apache#34470)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis authored Sep 25, 2023
1 parent 5a133e8 commit 404666d
Showing 1 changed file with 3 additions and 26 deletions.
29 changes: 3 additions & 26 deletions tests/system/providers/apache/kafka/example_dag_event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from __future__ import annotations

import json
import random
import string

from pendulum import datetime

Expand All @@ -32,7 +30,6 @@
# This is just for setting up connections in the demo - you should use standard
# methods for setting these connections in production
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor
from airflow.utils import db
Expand Down Expand Up @@ -68,10 +65,6 @@ def _producer_function():
yield (json.dumps(i), json.dumps(i + 1))


def _generate_uuid():
return "".join(random.choices(string.ascii_lowercase, k=6))


with DAG(
dag_id="fizzbuzz-load-topic",
description="Load Data to fizz_buzz topic",
Expand Down Expand Up @@ -105,12 +98,9 @@ def await_function(message):
if val % 5 == 0:
return val

def pick_downstream_dag(message, **context):
def wait_for_event(message, **context):
if message % 15 == 0:
print(f"encountered {message} - executing external dag!")
TriggerDagRunOperator(trigger_dag_id="fizz-buzz", task_id=f"{message}{_generate_uuid()}").execute(
context
)
return f"encountered {message}!"
else:
if message % 3 == 0:
print(f"encountered {message} FIZZ !")
Expand All @@ -123,25 +113,12 @@ def pick_downstream_dag(message, **context):
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=pick_downstream_dag,
event_triggered_function=wait_for_event,
)
# [END howto_sensor_await_message_trigger_function]

t0 >> t1

with DAG(
dag_id="fizz-buzz",
description="Triggered when mod 15 is 0.",
start_date=datetime(2022, 11, 1),
catchup=False,
tags=["fizz-buzz"],
):

def _fizz_buzz():
print("FIZZ BUZZ")

fizz_buzz_task = PythonOperator(task_id="fizz_buzz", python_callable=_fizz_buzz)


from tests.system.utils import get_test_run # noqa: E402

Expand Down

0 comments on commit 404666d

Please sign in to comment.