In the dbt community, a common question that comes up is how to sync dbt runs with one’s extract and loader tool. Fivetran connectors and dbt jobs are one particularly common pairing that comes up, and Analytics Engineers typically face a choice in how to orchestrate the two. On the one hand, the load and transform steps can be scheduled and run independently. Alternatively, they can be linked into an event-driven workflow. Airflow is a great tool for creating and running this type of workflow. Setting up a system to trigger and manage events increases the complexity at first, however there are benefits that make the results worth the time spent. In particular, we’ve observed several concerns raised in the dbt Slack community:
- There are a large number of related load and transform jobs, making it difficult to manage their separate schedules. This can be particularly painful when a transformation job depends on multiple upstream data loaders. See this thread.
- You want to pass parameters to the DAG run based on the outcome of the data loader(s). See this thread.
- It’s valuable to reduce overall latency of the load + transform process due to downstream operational dependencies. These might include supplying fresh data for a machine learning model to consume, or reporting SLA requirements.
If these factors are not a high priority, then it’s completely valid to use scheduled processes. However, many of the larger organizations (2,000+ employees) that @amy and I work with as members of the Solutions Architecture team at Fishtown view these as high-priority items. In that case, event-driven architectures like the one we describe in this article are key.
Summary of Approach
Our goal is to present a simplified, linear workflow and illustrate how to coordinate tasks using the Fivetran and dbt Cloud APIs with Airflow XComs. Note that an organization using dbt Core could accomplish a very similar workflow using the Airflow Bash Operator to trigger a dbt run. Our friends at Astronomer have a great series of blog posts featuring orchestration of dbt Core with Airflow. See part 1 and part 2. That said, there are some additional benefits to using dbt Cloud. These include the ability to run Pull Request checks with dbt Test independently of Airflow use.
With this setup we have:
- Isolation between the data loading (Fivetran), transformation (dbt) and orchestration (Airflow) functions in the stack
- Minimal re-work on the Airflow side as the dbt transformation and Fivetran processes grow in complexity over time
- Leveraged dbt’s native ability to parallelize job runs.
What you need to get started
You can find the codebase we implemented in writing this article here. To be clear, we didn’t seek to implement a complete production-ready system for this task, but rather to create a starting point.
To implement this yourself, at a minimum you will need access to the following:
- Fivetran API Key and API Secret. These are encoded into an api token in base64. The format is api-key:api-secret. This link documents how to create this value yourself.
- dbt Cloud API key. Navigate to Profile > API Access from the dbt Cloud console to generate this.
- Snowflake database to accept data loads and run the transforms.
- The dbt Cloud Account ID, which is used in the API calls to dbt Cloud.
- Access to a Google Cloud Platform account to host the Airflow server.
Dataset and use case
We used a publicly available dataset on Pokemon for this exercise, saved in a Google Sheet. Our use case is to use dbt to aggregate the average HP score across pokemon catches. Your dbt transformation process could evolve substantially while keeping this same architecture in place.
Hosting the Airflow server
** We recommend using a tool like Astronomer, a managed Airflow interface, if you want the convenience of a user-friendly interface like dbt Cloud and Fivetran.
Version controlling Airflow and dbt
We used two git repositories - one for the Airflow code and server setup and one for the dbt Project. Though it would be possible to combine them into a single repository, having separate codebases for dbt and Airflow reflects how each system is functionally separate in our architecture.
Setting up data loads
For this use case, we used Fivetran’s google sheet connector to run the data loads. In the event you aren’t using Fivetran for data loading, that element of the solution would change to either 1) use the API for a different data loading tool or 2) Code the data loading function into the Python Operators running in the Airflow DAG.
Data transformation process in dbt
Our dbt models are illustrated below. Agg_pokedex_summary is the final table we want to surface in the analytics layer of the warehouse.
This is clearly a simplistic DAG. In our case, we are focused on the relationship between the data loader and the transformation more so than the complexity of the transformation itself.
Orchestration process in Airflow
The Airflow DAG is illustrated below.
The input to this Airflow DAG is a simple mapping of the Fivetran connector_id and matching dbt job name:
These values are stored in the
dag_run configuration within each Python Operator in the Airflow process. They can be accessed in code like this:
connector_id = kwargs[‘dag_run’].conf[‘fivetran_connector_id’]
Communicating between Airflow tasks
Communication of state across Airflow tasks can be accomplished using XComs. By default, tasks in Airflow execute in separate runtimes, and don’t share variables. So, we can’t declare a variable in one task and pass it to a downstream task without writing that data to a common storage location. XComs are a convenient way to accomplish this in Airflow. As your use cases grow more complex, these object types are especially useful for developing conditional logic in your pipeline. For example, you could trigger a subsection of the dbt dag using the model selection syntax based on the results of prior Airflow tasks.
For example, in the ‘dbt_job’ task we can add elements for each task like so:
run_id = trigger_resp[‘id’]
And in downstream tasks such as ‘get_dbt_job_run_status’, we are able to extract them:
ti = kwargs[‘ti’]
run_id = ti.xcom_pull(key=‘dbt_run_id’, task_ids=‘dbt_job’)
With each of these elements in place, we have what’s needed to do things such as extracting the dbt job run status from the dbt Cloud API. The dbt run_id and account_id elements become variables in a GET request to the dbt Cloud endpoint.
Comparison to other approaches
It’s important to note that the solution we present here isn’t the only way to organize this type of process. We’ve spoken with a few dbt users who use Airflow to extract and rebuild the dbt DAG, then run each dbt model using the extracted SQL. This provides the benefit of allowing Airflow’s ability to provide more sophisticated retry logic than dbt. In our opinion, isolating the system that handles orchestration (Airflow) from the transformer (dbt) makes the overall process easier to understand and maintain. Plus, as the dbt DAG grows, using Airflow to extract and build the DAG introduces additional maintenance overhead to ensure the DAG reconstruction goes smoothly and that the runs are parallelized intelligently.
How to build this yourself
For further details on how we implemented this, please see the README file, Airflow code, and environment setup shell scripts in this repository. A real deployment would very likely include additional logic to enable retries and conditional execution flow in the Airflow DAG. Use this as a guide for how one could coordinate between data loading and transformation functions.
What do you think?
Our opinion is that this simple setup plays to the unique strengths of both Airflow and dbt:
- Airflow focuses on scheduling and process flow
- dbt - deployed as dbt Cloud in our example - handles the meat of the data transformation
What aren’t we thinking about that you think should be considered here? We would love to continue the discussion with you!