task_flow_xgboost_modin
Build a dataframe task.
Data ProcessingData Science
Providers:
Modules:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have the CLI
2. Initate the project in a local directory:
3. Copy and paste the code below into a file in thedags
directory.
4. Add the following to your requirements.txt
file:
5. Run the DAG from the local directory where the project was initiated:
from typing import Anyimport jsonfrom airflow.decorators import dag, taskfrom airflow.utils.dates import days_agofrom airflow.operators.dummy_operator import DummyOperatorimport rayfrom ray_provider.decorators.ray_decorators import ray_taskimport numpy as npimport xgboost_ray as xgbfrom datetime import datetime# These args will get passed on to each operator# You can override them on a per-task basis during operator initializationdefault_args = {"owner": "airflow",}task_args = {"ray_conn_id": "ray_cluster_connection"}SIMPLE = FalseDataFrame = Any@dag(default_args=default_args,schedule_interval=None,start_date=datetime(2021, 1, 1, 0, 0, 0),tags=["xgboost-modin"],)def xgboost_modin_breast_cancer():@ray_task(eager=True, **task_args)def load_dataframe() -> DataFrame:"""Build a dataframe task."""print("Loading CSV.")if SIMPLE:print("Loading simple")from sklearn import datasetsdata = datasets.load_breast_cancer(return_X_y=True)else:url = ("https://archive.ics.uci.edu/ml/machine-learning-databases/""00280/HIGGS.csv.gz")import modin.pandas as mpdcolnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]data = mpd.read_csv(url, names=colnames)print("loaded higgs")print("Loaded CSV.")return data@ray_task(**task_args)def create_data(data):if SIMPLE:from sklearn.model_selection import train_test_splitdata, labels = datatrain_x, test_x, train_y, test_y = train_test_split(data, labels, test_size=0.25)train_set = xgb.RayDMatrix(train_x, train_y)test_set = xgb.RayDMatrix(test_x, test_y)else:df_train = data[(data["feature-01"] < 0.4)]colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]train_set = xgb.RayDMatrix(df_train, label="label", columns=colnames)df_validation = data[(data["feature-01"] >= 0.4) & (data["feature-01"] < 0.8)]test_set = xgb.RayDMatrix(df_validation, label="label")return train_set, test_set@ray_task(**task_args)def train_model(data) -> None:dtrain, dvalidation = dataevallist = [(dvalidation, "eval")]evals_result = {}config = {"tree_method": "hist","eval_metric": ["logloss", "error"],}bst = xgb.train(params=config,dtrain=dtrain,evals_result=evals_result,ray_params=xgb.RayParams(max_actor_restarts=1, num_actors=2, cpus_per_actor=2),num_boost_round=100,evals=evallist,)return bstbuild_raw_df = load_dataframe()data = create_data(build_raw_df)trained_model = train_model(data)kickoff_dag = DummyOperator(task_id="kickoff_dag")complete_dag = DummyOperator(task_id="complete_dag")kickoff_dag >> build_raw_dftrained_model >> complete_dagxgboost_modin_breast_cancer = xgboost_modin_breast_cancer()