Apache Airflow Provider for Vineyard
The apache airflow provider for vineyard contains components to share intermediate data among tasks in Airflow workflows using vineyard.
Vineyard works as the XCom backend for airflow workers to allow transferring large-scale data objects between tasks that cannot be fit into the Airflow's database backend without involving external storage systems like HDFS. The Vineyard XCom backend handles object migration as well when the required inputs are not located where the task is scheduled to execute.
Table of Contents
Requirements
The following packages are needed to run Airflow on Vineyard,
- airflow >= 2.1.0
- vineyard >= 0.2.12
Configuration
Install required packages:
pip3 install airflow-provider-vineyardConfigure Vineyard locally
The vineyard server can be easier launched locally with the following command:
python3 -m vineyard --socket=/tmp/vineyard.sockSee also our documentation about launching vineyard.
Configure Airflow to use the vineyard XCom backend by specifying the environment variable
export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXComand configure the location of the UNIX-domain IPC socket for vineyard client by
export AIRFLOW__VINEYARD__IPC_SOCKET=/tmp/vineyard.sockor
export VINEYARD_IPC_SOCKET=/tmp/vineyard.sockIf you have deployed a distributed vineyard cluster, you can also specify the
persist
environment to enable the vineyard client to persist the data to the vineyard cluster.export AIRFLOW__VINEYARD__PERSIST=true
Usage
After installing the dependencies and preparing the vineyard server, you can launch your airflow scheduler and workers, and run the following DAG as an example,
import numpy as npimport pandas as pdfrom airflow.decorators import dag, taskfrom airflow.utils.dates import days_agodefault_args = {'owner': 'airflow',}@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])def taskflow_etl_pandas():@task()def extract():order_data_dict = pd.DataFrame({'a': np.random.rand(100000),'b': np.random.rand(100000)})return order_data_dict@task(multiple_outputs=True)def transform(order_data_dict: dict):return {"total_order_value": order_data_dict["a"].sum()}@task()def load(total_order_value: float):print(f"Total order value is: {total_order_value:.2f}")order_data = extract()order_summary = transform(order_data)load(order_summary["total_order_value"])taskflow_etl_pandas_dag = taskflow_etl_pandas()
In the above example, task extract
and task transform
shares a
pandas.DataFrame
as the intermediate data, which is impossible as
it cannot be pickled and when the data is large, it cannot be fit into the
table in backend databases of Airflow.
The example is adapted from the documentation of Airflow, see also Tutorial on the Taskflow API.
Run the tests
Start your vineyardd with the following command,
python3 -m vineyardSet airflow to use the vineyard XCom backend, and run tests with pytest,
export AIRFLOW__CORE__XCOM_BACKEND=vineyard.contrib.airflow.xcom.VineyardXCompytest -s -vvv python/vineyard/contrib/airflow/tests/test_python_dag.pypytest -s -vvv python/vineyard/contrib/airflow/tests/test_pandas_dag.py
The pandas test suite is not possible to run with the default XCom backend, vineyard enables airflow to exchange complex and big data without modify the DAG and tasks!
Deploy using Docker Compose
We provide a reference docker-compose settings (see docker-compose.yaml) for deploying airflow with vineyard as the XCom backend on Docker Compose. For more details, please refer to the official documentation.
Before deploying the docker-compose, you need to initialize the environment as follows.
$ mkdir -p ./dags ./logs ./plugins ./config$ echo -e "AIRFLOW_UID=$(id -u)" > .env
Then, copy the example dags to the ./dags
directory.
$ cp examples_dags/v6d*.py ./dags
After that, the docker-compose containers could be deployed as
$ cd docker/$ docker build . -f Dockerfile -t vineyardcloudnative/vineyard-airflow:2.6.3$ docker compose up
You can see the added DAGs and run them via web ui or cli.
We have also included a diff file docker-compose.yaml.diff that shows the changed pieces that can be introduced into your own docker compose deployment.
Deploy on Kubernetes
We provide a reference settings (see values.yaml) for deploying airflow with vineyard as the XCom backend on Kubernetes, based on the official helm charts.
Next, we will show how to deploy airflow with vineyard on Kubernetes using the helm chart.
You are supposed to deploy the vineyard cluster on the kubernetes cluster first. For example, you can deploy a vineyard cluster with the vineyardctl command.
$ vineyardctl deploy vineyard-deployment --create-namespace
The values.yaml mainly tweak the following settings:
- Installing vineyard dependency to the containers using pip before starting workers
- Adding a vineyardd container to the airflow pods
- Mounting the vineyardd's UNIX-domain socket and shared memory to the airflow worker pods
Note that the values.yaml
may doesn't work in your environment, as airflow requires
other settings like postgresql database, persistence volumes, etc. You can combine
the reference values.yaml
with your own specific Airflow settings.
The values.yaml for Airflow's helm chart can be used as
# add airflow helm stable repo$ helm repo add apache-airflow https://airflow.apache.org$ helm repo update# deploy airflow$ helm install -f values.yaml airflow apache-airflow/airflow --namespace airflow --create-namespace
If you want to put the vineyard example DAGs into the airflow scheduler pod and the worker pod, you can use the following command.
$ kubectl cp ./example_dags/v6d_etl.py $(kubectl get pod -lcomponent=scheduler -n airflow -o jsonpath='{.items[0].metadata.name}'):/opt/airflow/dags -c scheduler -n airflow$ kubectl cp ./example_dags/v6d_etl.py $(kubectl get pod -lcomponent=worker -n airflow -o jsonpath='{.items[0].metadata.name}'):/opt/airflow/dags -c worker -n airflow