python_operator
Example DAG demonstrating the usage of the PythonOperator.
Airflow Fundamentals
Providers:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in the
dagsdirectory.
4. Run the DAG from the local directory where the project was initiated:
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of the classic Python operators to execute Python functions natively andwithin a virtual environment."""from __future__ import annotationsimport loggingimport sysimport timefrom pprint import pprintimport pendulumfrom airflow.models.dag import DAGfrom airflow.operators.python import (ExternalPythonOperator,PythonOperator,PythonVirtualenvOperator,is_venv_installed,)log = logging.getLogger(__name__)PATH_TO_PYTHON_BINARY = sys.executablewith DAG(dag_id="example_python_operator",schedule=None,start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,tags=["example"],):# [START howto_operator_python]def print_context(ds=None, **kwargs):"""Print the Airflow context and ds variable from the context."""pprint(kwargs)print(ds)return "Whatever you return gets printed in the logs"run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)# [END howto_operator_python]# [START howto_operator_python_render_sql]def log_sql(**kwargs):logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))log_the_sql = PythonOperator(task_id="log_sql_query",python_callable=log_sql,templates_dict={"query": "sql/sample.sql"},templates_exts=[".sql"],)# [END howto_operator_python_render_sql]# [START howto_operator_python_kwargs]# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectivelydef my_sleeping_function(random_base):"""This is a function that will run within the DAG execution"""time.sleep(random_base)for i in range(5):sleeping_task = PythonOperator(task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10})run_this >> log_the_sql >> sleeping_task# [END howto_operator_python_kwargs]if not is_venv_installed():log.warning("The virtalenv_python example task requires virtualenv, please install it.")else:# [START howto_operator_python_venv]def callable_virtualenv():"""Example function that will be performed in a virtual environment.Importing at the module level ensures that it will not attempt to import thelibrary before it is installed."""from time import sleepfrom colorama import Back, Fore, Styleprint(Fore.RED + "some red text")print(Back.GREEN + "and with a green background")print(Style.DIM + "and in dim text")print(Style.RESET_ALL)for _ in range(4):print(Style.DIM + "Please wait...", flush=True)sleep(1)print("Finished")virtualenv_task = PythonVirtualenvOperator(task_id="virtualenv_python",python_callable=callable_virtualenv,requirements=["colorama==0.4.0"],system_site_packages=False,)# [END howto_operator_python_venv]sleeping_task >> virtualenv_task# [START howto_operator_external_python]def callable_external_python():"""Example function that will be performed in a virtual environment.Importing at the module level ensures that it will not attempt to import thelibrary before it is installed."""import sysfrom time import sleepprint(f"Running task via {sys.executable}")print("Sleeping")for _ in range(4):print("Please wait...", flush=True)sleep(1)print("Finished")external_python_task = ExternalPythonOperator(task_id="external_python",python_callable=callable_external_python,python=PATH_TO_PYTHON_BINARY,)# [END howto_operator_external_python]run_this >> external_python_task >> virtualenv_task