Salesforce ELT with Fivetran, dbt Cloud, and Census
This example showcases a modern ELT pipeline extracting with Fivetran, performing data transformations via a dbt Cloud job, and reverse-ETL using Census.
ETL/ELT
Providers:
Run this DAG
1. Install the Astronomer CLI:Skip if you already have our CLI
2. Download the repository:
3. Navigate to where the repository was cloned and start the DAG:
ELT with Airflow and dbt
This repo contains DAGs to demonstrate a variety of ELT patterns using Airflow along with dbt.
All DAGs can be found under the dags/
folder, which is partitioned by dbt Core and dbt Cloud use. Specific
data stores need connections and may require accounts with cloud providers. Further details are provided in
the data store specific sections below.
Requirements
The Astronomer CLI and Docker installed locally are needed to run all DAGs in this repo.
Getting Started
The easiest way to run these example DAGs is to use the Astronomer CLI to get an Airflow instance up and running locally:
- Install the Astronomer CLI.
- Clone this repo locally and navigate into it.
- Start Airflow locally by running
astro dev start
. - Create all necessary connections and variables - see below for specific DAG cases.
- Navigate to localhost:8080 in your browser and you should see the tutorial DAGs there.
dbt Cloud
To create a connection to dbt Cloud, navigate to Admin -> Connections
in the Airflow UI and select the
"dbt Cloud" connection type. An API token is required, however, the Account ID is not. You may provide an
Account ID and this will be used by the dbt Cloud tasks, but you can also override or supply a specific
Account ID at the task level using the account_id
parameter if you wish.
DAGs
All dbt Cloud DAGs used the following dbt Cloud provider version:
apache-airflow-providers-dbt-cloud==1.0.1
This provider version uses dbt Cloud API v2.
Add a Simple Operational Check for dbt Cloud Jobs
This example showcases an example of adding an operational check to ensure the dbt Cloud job is not running
prior to triggering. The DbtCloudHook
provides a list_job_runs()
method which can be used to retrieve the
latest triggered run for a job and check the status of said run. If the job is not in a state of 10 (Success),
20 (Error), or 30 (Cancelled), the pipeline will not try to trigger it.
This pipeline requires a connection to dbt Cloud as well as an Airflow Variable for the job_id
to be
triggered.
Salesforce ELT with Fivetran, dbt Cloud, and Census
This example showcases how a modern ELT pipeline can be created for extracting and loading Salesforce data into a data warehouse with Fivetran, performing data transformations via a dbt Cloud job, and finally reverse-syncing the transformed data back to a marketing platform using Census.
This pipeline requires connections to Fivetran, dbt Cloud, and Census as well as an Airflow Variable for the
connector_id
to be used in the Fivetran extract.
To create a connection to Fivetran, navigate to Admin -> Connections
in the Airflow UI and select the
"Fivetran" connection type. Provide your Fivetran API key and secret.
To create a connection to Census, navigate to Admin -> Connections
in the Airflow UI and select the
"Census" connection type. Provide your Census secret token.
Additional Airflow provider packages are required for this DAG:
airflow-provider-census==1.1.1airflow-provider-fivetran-async==1.0.0a4
dbt Core
DAGs
Rerun dbt Models from Failure
This example shows how you can use the new dbt build +=
command to rerun a model from the point of failure.
By defining the new command as a downstream dependency of the dbt run
command, you can rerun your dbt model
from the point of failure if it fails.
Furthermore, you can use trigger rules to avoid redundant task runs and ensure that your downstream tasks as intended. In this case, we suppose that after the dbt models are successfully built, the data is sent to Salesforce using Hightouch.
This pipeline assumes that you are using dbt Core with the BashOperator
.
Project Setup
We are currently using the jaffle_shop sample dbt project.
The only files required for the Airflow DAGs to run are dbt_project.yml
, profiles.yml
and
target/manifest.json
, but we included the models for completeness. If you would like to try these DAGs with
your own dbt workflow, feel free to drop in your own project files.
Additional Notes
- If you make changes to the dbt project, you will need to run
dbt compile
in order to update the
manifest.json
file. This may be done manually during development, as part of a CI/CD pipeline, or as a
separate step in a production pipeline run before the Airflow DAG is triggered.
- The sample dbt project contains the
profiles.yml
, which is configured to use environment variables. The
database credentials from an Airflow connection are passed as environment variables to the BashOperator
tasks running the dbt commands.
- Each DAG runs a
dbt_seed
task at the beginning that loads sample data into the database. This is
simply for the purpose of this demo.