AwaitKafkaMessageOperator

Apache Kafka

AwaitKafkaMessageOperator An Airflow operator that defers until a specific message is published to Kafka.

Access Instructions

Install the Apache Kafka provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

topicsSequence[str]Topics (or topic regex) to use for reading from
apply_functionThe functoin to apply to messages to determine if an event occurred. As a dot

Documentation

AwaitKafkaMessageOperator An Airflow operator that defers until a specific message is published to Kafka.

The operator creates a consumer that reads the Kafka log until it encounters a positive event.

The behavior of the consumer for this trigger is as follows: - poll the Kafka topics for a message - if no message returned, sleep - process the message with provided callable and commit the message offset - if callable returns any data, raise a TriggerEvent with the return data - else continue to next message - return event (as default xcom or specific xcom key)

notation string. :type apply_function: str :param apply_function_args: Arguments to be applied to the processing function, defaults to None :type apply_function_args: Optional[Sequence[Any]], optional :param apply_function_kwargs: Key word arguments to be applied to the processing function,, defaults to None :type apply_function_kwargs: Optional[Dict[Any, Any]], optional :param kafka_conn_id: The airflow connection storing the Kafka broker address, defaults to None :type kafka_conn_id: Optional[str], optional :param kafka_config: the config dictionary for the kafka client (additional information available on the confluent-python-kafka documentation), defaults to None :type kafka_config: Optional[Dict[Any, Any]], optional :param poll_timeout: How long the kafka consumer should wait for a message to arrive from the kafka cluster,

defaults to 1

type poll_timeout

float, optional

param poll_interval

How long the kafka consumer should sleep after reaching the end of the Kafka log, defaults to 5

type poll_interval

float, optional

param xcom_push_key

the name of a key to push the returned message to, defaults to None

type xcom_push_key

_type_, optional

Was this page helpful?