Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in the
dagsdirectory.
4. Add the following to your
requirements.txtfile:
5. Run the DAG from the local directory where the project was initiated:
#! /usr/bin/env python# -*- coding: utf-8 -*-## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.### Adapted from example dags in airflow-provider-ray, see also## https://github.com/anyscale/airflow-provider-ray/blob/main/ray_provider/example_dags/ray_pandas_example_dag.py#import jsonfrom airflow.decorators import dagfrom airflow.decorators import taskfrom airflow.utils.dates import days_agodefault_args = {'owner': 'airflow',}@dag(default_args=default_args,schedule_interval=None,start_date=days_ago(2),tags=['example'],)def taskflow_etl():@task()def extract():data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'order_data_dict = json.loads(data_string)return order_data_dict@task(multiple_outputs=True)def transform(order_data_dict: dict):total_order_value = 0for value in order_data_dict.values():total_order_value += valuereturn {"total_order_value": total_order_value}@task()def load(total_order_value: float):print(f"Total order value is: {total_order_value:.2f}")order_data = extract()order_summary = transform(order_data)load(order_summary["total_order_value"])taskflow_etl_dag = taskflow_etl()