Execute ML Pipelines in Databricks

Run an end-to-end pipeline from BigQuery data ingest, feature engineering, model training, to publishing all within a Databricks cluster.

AI + Machine Learning


Providers:

Modules:

Last Updated: Jun. 21, 2022

Run this DAG

1. Install the Astronomer CLI:Skip if you already have our CLI

2. Download the repository:

3. Navigate to where the repository was cloned and start the DAG:

Databricks Data Science and Machine Learning Examples with Airflow

DAGs

These DAGs give basic examples on how to use Airflow to orchestrate your ML tasks in Databricks. The Databricks code is in a Databricks notebook for which you can find descriptions of below.

  1. databricks-ml-example.py - Runs an end to end data ingest to model publishing pipeline with the following tasks:

    • ingest: Pulls data from BigQuery and does some basic cleaning and transformations then saves it to Delta Lake.
    • feature engineering: Extract features for model and save output to the Feature Store.
    • train: Train model with a Databricks notebook.
    • register: Register model to mlflow.
  2. databricks-automl-example.py - Runs an experimental pipeline from ingest to model training with Databricks AutoML with the following tasks:

    • ingest: Pulls data from BigQuery and does some basic cleaning and transformations then saves it to Delta Lake.
    • feature engineering: Extract features for model and save output to the Feature Store.
    • train: Train models using AutoML with a notebook.
  3. databricks-ml-retrain-example.py - Runs a pipeline that retrains, registers, and submits a transition to Stage request for a model, then submits a Slack notification with the following tasks:

    • retrain: Retrain model with a notebook.
    • register: Register in MLflow.
    • submit transition request: Submit an approval request in MLflow to transition the model to Stage.
    • notify: Send a Slack notification with relevant details about the model.

    Note: For this DAG we used the Databricks REST API in many places for requests to MLFlow due to there not being a Python API available for those endpoints yet.

  4. databricks-model-serve-sagemaker-example.py - Deploys MLflow model to Sagemaker

    • check model info for Staging: Checks if there is a model marked for Staging and gets its information.
    • new model version confirmation: Shortcircuit Operator that determines if the model has been deployed already and whether to proceed or not.
    • deploy model: Use mlflow.sagemaker API to deploy model and endpoint in AWS Sagemaker.
    • test model endpoint: Use Sagemaker API to send a request with sample data to get predictions.
    • mark as deployed: Tag model version in MLflow Registry as deployed.

    Note: For this DAG we place AWS credentials as environment variables and not as an Airflow connection. This is to simply avoid putting them in two places, since the API calls to Sagemaker or MLflow that don't use an Airflow operator cannot access those credentials from connections.

Requirements

Bigquery

Databricks

  • Authentication token (if you don't want to use a username and password to authenticate from Airflow)
  • Existing cluster setup with GCP credentials (you can use an on demand cluster, but you will need to supply it the GCP credentials accordingly)
  • Notebooks for each task.
    • You can use the notebooks in the example_notebooks folder which have been provided in this repo to get started.

Airflow

  • Databricks connection
  • Airflow Variables
    • databricks_user
    • databricks_cluster_id
    • databricks_instance
    • mlflow_pyfunc_image_url - The location of your mlflow-pyfunc image (See documentation for more info)
    • sagemaker_execution_arn - Execution arn for Sagemaker so that it can deploy the model end endpoint
  • MLflow environment variables in your .env
    • MLFLOW_TRACKING_URI=databricks
    • DATABRICKS_HOST=your_databricks_host
    • DATABRICKS_TOKEN=your_PAT
  • AWS environment variables in your .env
    • AWS_ACCESS_KEY_ID
    • AWS_SECRET_ACCESS_KEY
    • AWS_SESSION_TOKEN