
Apache Kafka

AwaitMessageTrigger A trigger that waits for a message matching specific criteria to arrive in Kafka

Access Instructions

Install the Apache Kafka provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.


topicsSequence[str]The topic (or topic regex) that should be searched for messages
apply_functionstrthe location of the function to apply to messages for determination of matching criteria. (In python dot notation as a string)
apply_function_argsOptional[Sequence[Any]], optionalA set of arguments to apply to the callable, defaults to None
apply_function_kwargsOptional[Dict[Any, Any]], optionalA set of key word arguments to apply to the callable, defaults to None, defaults to None
kafka_conn_idOptional[str], optionalThe airflow connection id to fetch kafka brokers from, defaults to None
kafka_configOptional[Dict[Any, Any]], optionalthe config dictionary for the kafka client (additional information available on the confluent-python-kafka documentation), defaults to None
poll_timeoutfloat, optionalHow long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1
poll_intervalfloat, optionalHow long the the trigger should sleep after reaching the end of the Kafka log (seconds) , defaults to 5


AwaitMessageTrigger A trigger that waits for a message matching specific criteria to arrive in Kafka

The behavior of the consumer of this trigger is as follows: - poll the Kafka topics for a message

  • if no message returned, sleep

param topics

The topic (or topic regex) that should be searched for messages

type topics


param apply_function

the location of the function to apply to messages for determination of matching criteria. (In python dot notation as a string)

type apply_function


param apply_function_args

A set of arguments to apply to the callable, defaults to None

type apply_function_args

Optional[Sequence[Any]], optional

param apply_function_kwargs

A set of key word arguments to apply to the callable, defaults to None, defaults to None

type apply_function_kwargs

Optional[Dict[Any, Any]], optional

param kafka_conn_id

The airflow connection id to fetch kafka brokers from, defaults to None

type kafka_conn_id

Optional[str], optional

param kafka_config

the config dictionary for the kafka client (additional information available on the confluent-python-kafka documentation), defaults to None

type kafka_config

Optional[Dict[Any, Any]], optional

param poll_timeout

How long the Kafka client should wait before returning from a poll request to Kafka (seconds), defaults to 1

type poll_timeout

float, optional

param poll_interval

How long the the trigger should sleep after reaching the end of the Kafka log (seconds) , defaults to 5

type poll_interval

float, optional

Was this page helpful?