🧪 Experimental Version
This provider is an experimental alpha containing necessary components to orchestrate and schedule Ray tasks using Airflow. It is actively maintained and being developed to bring production-ready workflows to Ray using Airflow. Thie release contains everything needed to begin building these workflows using the Airlfow taskflow API.
Current Release: 0.2.1
Requirements
Visit the Ray Project page for more info on Ray.
⚠️ The server version and client version (build) of Ray MUST be
the same.
- Python Version >= 3.7- Airflow Version >= 2.0.0- Ray Version == 1.3.0- Filelock >= 3.0.0
Modules
- Ray XCom Backend: Custom XCom backend
to assist operators in moving data between tasks using the Ray API with its internal Plasma store, thereby allowing for in-memory distributed processing and handling of large data objects.
- Ray Hook: Extension of
Http
hook
that uses the Ray client to provide connections to the Ray Server.
- Ray Decorator: Task decorator
to be used with the task flow API, combining wrapping the existing airflow
@task
decorate with ray.remote
functionality, thereby executing each
task on the ray cluster.
Configuration and Usage
Add the provider package wheel file to the root directory of your Airflow project.
In your Airflow
Dockerfile
, you will need to add an environment variable to
specify your custom backend, along with the provider wheel install. Add the following:
FROM quay.io/astronomer/ap-airflow:2.0.2-1-buster-onbuildUSER rootRUN pip uninstall astronomer-airflow-version-check -yUSER astro> Check ap-airflow version, if unsure, change to `ap-airflow:latest-onbuild`
- We are using a Ray
1.3.0
and python version3.7
. To get a bleeding edge
version of Ray, you can to follow this format to build the wheel url in your
requirements.txt
file:
pip install airflow-provider-ray
- Configure Ray Locally. To run ray locally, you'll need a minimum 6GB of free
memory.To start, in your environment with ray installed, run:
(venv)$ ray start --num-cpus=8 --object-store-memory=7000000000 --headIf you have extra resources, you can bump the memory up.You should now be able to open the ray dashboard at [http://127.0.0.1:8265/](http://127.0.0.1:8265/).
Start your Airflow environment and open the UI.
In the Airflow UI, add an
Airflow Pool
with the following:Pool (name): ray_worker_poolSlots: 25In the Airflow UI, add an
Airflow Connection
with the following:Conn Id: ray_cluster_connectionConn Type: HTTPHost: Cluster IP Address, with basic Auth params if neededPort: 10001Using the taskflow API, your airflow task should now use the
@ray_task
decorator for any ray task and add the ray_conn_id
,
parameter as task_args
, like:
from ray_provider.decorators import ray_taskdefault_args = {"owner": "airflow"}task_args = {"ray_conn_id": "ray_cluster_connection"}...@dag(default_args=default_args,..)def ray_example_dag():@ray_task(**task_args)def sum_cols(df: pd.DataFrame) -> pd.DataFrame:return pd.DataFrame(df.sum()).T
Project Contributors and Maintainers
This project is built in collaboration between Astronomer and Anyscale, with active contributions from:
This project is formatted via black
:
pip install blackblack .
Connections
TBD - [Info on building a connection to Ray]