external_task_marker_dag
Example DAG demonstrating setting up inter-DAG dependencies using ExternalTaskSensor and ExternalTaskMarker
Airflow Fundamentals
Providers:
Modules:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in thedags
directory.
4. Run the DAG from the local directory where the project was initiated:
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating setting up inter-DAG dependencies using ExternalTaskSensor andExternalTaskMarker.In this example, child_task1 in example_external_task_marker_child depends on parent_task inexample_external_task_marker_parent. When parent_task is cleared with 'Recursive' selected,the presence of ExternalTaskMarker tells Airflow to clear child_task1 and its downstream tasks.ExternalTaskSensor will keep poking for the status of remote ExternalTaskMarker task at a regularinterval till one of the following will happen:ExternalTaskMarker reaches the states mentioned in the allowed_states list.In this case, ExternalTaskSensor will exit with a success status codeExternalTaskMarker reaches the states mentioned in the failed_states listIn this case, ExternalTaskSensor will raise an AirflowException and user need to handle thiswith multiple downstream tasksExternalTaskSensor times out. In this case, ExternalTaskSensor will raise AirflowSkipExceptionor AirflowSensorTimeout exception"""from __future__ import annotationsimport pendulumfrom airflow.models.dag import DAGfrom airflow.operators.empty import EmptyOperatorfrom airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensorstart_date = pendulum.datetime(2021, 1, 1, tz="UTC")with DAG(dag_id="example_external_task_marker_parent",start_date=start_date,catchup=False,schedule=None,tags=["example2"],) as parent_dag:# [START howto_operator_external_task_marker]parent_task = ExternalTaskMarker(task_id="parent_task",external_dag_id="example_external_task_marker_child",external_task_id="child_task1",)# [END howto_operator_external_task_marker]with DAG(dag_id="example_external_task_marker_child",start_date=start_date,schedule=None,catchup=False,tags=["example2"],) as child_dag:# [START howto_operator_external_task_sensor]child_task1 = ExternalTaskSensor(task_id="child_task1",external_dag_id=parent_dag.dag_id,external_task_id=parent_task.task_id,timeout=600,allowed_states=["success"],failed_states=["failed", "skipped"],mode="reschedule",)# [END howto_operator_external_task_sensor]# [START howto_operator_external_task_sensor_with_task_group]child_task2 = ExternalTaskSensor(task_id="child_task2",external_dag_id=parent_dag.dag_id,external_task_group_id="parent_dag_task_group_id",timeout=600,allowed_states=["success"],failed_states=["failed", "skipped"],mode="reschedule",)# [END howto_operator_external_task_sensor_with_task_group]child_task3 = EmptyOperator(task_id="child_task3")child_task1 >> child_task2 >> child_task3