EventTriggersFunctionOperator

Apache Kafka

EventTriggersFunctionOperator An Airflow operator that defers until a specific message is published to Kafka, then triggers a registered function

Access Instructions

Install the Apache Kafka provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

topicsSequence[str]Topics (or topic regex) to use for reading from
apply_functionThe function to apply to messages to determine if an event occurred. As a dot

Documentation

EventTriggersFunctionOperator An Airflow operator that defers until a specific message is published to Kafka, then triggers a registered function

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

notation string. :type apply_function: str :param event_triggered_function: The callable to trigger once the apply_function encounters a positive event. :type event_triggered_function: callable :param apply_function_args: Arguments to be applied to the processing function, defaults to None :type apply_function_args: Optional[Sequence[Any]], optional :param apply_function_kwargs: Key word arguments to be applied to the processing function,, defaults to None :type apply_function_kwargs: Optional[Dict[Any, Any]], optional :param kafka_conn_id: The airflow connection storing the Kafka broker address, defaults to None :type kafka_conn_id: Optional[str], optional :param kafka_config: the config dictionary for the kafka client (additional information available on the confluent-python-kafka documentation), defaults to None :type kafka_config: Optional[Dict[Any, Any]], optional :param poll_timeout: How long the kafka consumer should wait for a message to arrive from the kafka cluster,

defaults to 1

type poll_timeout

float, optional

param poll_interval

How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

type poll_interval

float, optional

param xcom_push_key

the name of a key to push the returned message to, defaults to None

type xcom_push_key

_type_, optional

Was this page helpful?