Summertime is over
Since dbt version 0.10.1, if you were running dbt models on BigQuery that were both incremental and partitioned on a date or timestamp column, you were living a pretty good life:
- dbt had handy macros to conditionally switch between “all records” and “only new records”, eventually rolled into the sugary
is_incremental()
. - So what if the boilerplate filter syntax (
where updated_at ≥ (select max from {{this}})
) didn’t play well with BQ’s penny-pinching optimizer? You could supply a static, relative,var
-based, or—with the help ofstatement
calls—still-quite-dynamic filter that, in incremental runs, significantly limited the amount of data scanned for transformation. Those model builds could be fast, an order of magnitude faster than the--full-refresh
version. - To finish it off, dbt would handle the reconciliation of those transformed new records with the existing database table, leveraging BigQuery’s awesome atomic
merge
operation to ensure that the model’sunique_key
really was unique. (More than that—BQ would return an error if it wasn’t.)
So it’s the summertime of 2018, and the living is easy. Summer becomes fall, and one day, at 4:48 pm, you get this message:
hey @jerco, soooo… all our efforts to make sure the queries to load GA data were incrementalized, so as to only select a few days of data have ended up not actually being effective from a cost standpoint, because the merge into part forces it to access the entire table, causing each load of intermediate.ga_web_sessions to access 8.45 TB of data. That’s a $42 query that we run 12 times a day at present. Yikes. Any ideas?
Cool, you say. You open an issue. You staunch the wound, overriding dbt’s incremental
materialization for BigQuery with some janky custom logic. You start breathing again.
Over the next fourteen months, as the dbt community grew, every now and again someone new would chime into the bigquery channel with the same question: Why does dbt do a full-table scan every time you run an incremental model, even when that model is partitioned? Even when dbt knows that model is partitioned, because dbt is the one who partitioned it?
For a long time, we didn’t have a good, generalizable answer to this. One developed slowly, with contributions and input from several dedicated members of the dbt + BigQuery community.
dbt is 0.15, going on 0.16
In version 0.16.0
, we’re making serious-and-smart changes to the way that dbt knows about and uses the partition_by
model config on BigQuery.
partition_by = {}
First, an across-the-board change: **partition_by
is now a dictionary**. So if you have a model that you want to materialize as a table partitioned on a timestamp column created_at
, previously you would write something like:
{{config(
materialized = 'table',
partition_by = 'date(created_at)'
)}}
Starting in 0.16.0
, you’ll instead write:
{{config(
materialized = 'table',
partition_by = { 'field': 'created_at', 'data_type': 'timestamp' }
)}}
There are a couple of good reasons for this, the biggest of them being that BigQuery recently rolled out integer range partitioned tables:
{{config(
materialized = 'table',
partition_by = {
'field': 'customer_id',
'data_type': int64,
'range': {
'end': 100,
'interval': 10,
'start': 0
}
}
)}}
This directly maps to what you’d see if you were to run bq show
:
...
"rangePartitioning": {
"field": "customer_id",
"range": {
"end": "100",
"interval": "10",
"start": "0"
}
},
...
This doesn’t mean you need to worry about all your existing models partitioned on a date or timestamp column. dbt will do its best to infer whether your partition column is a date
(default) or timestamp
(if you’ve wrapped the column name in some version of date()
).
Incremental strategies
The simple, tried-and-true way dbt runs incremental models on BigQuery will be sticking around as the merge
incremental strategy, and it will remain the default. Thanks for some last-minute sleuthing and help from @clausherther, we’ve observed, tested, and validated that adding a cluster key to an incremental model improves merge
performance. To my knowledge, this is relatively undocumented behavior, and possibly new within the past six months. We’re always seeking to better understand the more opaque features of BigQuery, and clustering is certainly one of them.
For bigger datasets, we’re also rolling out a new, highly performant insert_overwrite
strategy. For those familiar, it’s analogous to the Spark functionality by the same name:
- The materialization will not use the
unique_key
config to match records. Instead, it asks BigQuery to drop and replace entire partitions as discrete, atomic units. - There is an optional
partitions
config and helpful datetime macros. If supplied, the materialization will use these exact static partition values for replacement in the DML it writes. If not supplied, the materialization will instead execute a multi-query script to dynamically determine which partitions to replace. - In both cases, the goal is to limit BigQuery’s scan of the existing table to only the partitions requiring replacement, avoiding a full-table scan of the merge target.
- As in Spark: **“Be sure to re-select all of the relevant data for a partition when using incremental models.”
Static
Let’s say we have an incremental model, sessions
, that queries an events
table. We’re going to use the partitions
config:
{% set partitions_to_replace = [
'current_date',
'date_sub(current_date, interval 1 day)'
] %}
{{config(
materialized = 'incremental',
partition_by = { 'field': 'session_date', 'data_type': 'date' },
incremental_strategy = 'insert_overwrite',
partitions = partitions_to_replace
)}}
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
-- recalculate yesterday + today
where date(event_timestamp) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
... rest of model ...
If we wanted this to run more dynamically—let’s say, always for the past 3 days—we could leverage dbt’s baked-in datetime macros and write a few of our own.
The insert_overwrite
strategy still runs a merge
statement, since it’s the best way to perform atomic DML in BigQuery, though now using a constant false predicate. From BigQuery’s docs:
If the merge_condition is FALSE, the query optimizer avoids using a JOIN. This optimization is referred to as a constant false predicate. A constant false predicate is useful when you perform an atomic DELETE on the target plus an INSERT from a source (DELETE with INSERT is also known as a REPLACE operation).
merge into `my_project`.`my_dataset`.`sessions` as DBT_INTERNAL_DEST
using (
... model SQL ...
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and DBT_INTERNAL_DEST.session_date in (
current_date,
date_sub(current_date, interval 1 day)
)
then delete
when not matched then insert
(... columns ...)
values
(... columns ...)
Dynamic
Or you can leverage the materialization script to execute dynamic partition replacement, taking advantage of the calculated _dbt_max_partition
variable:
{{config(
materialized = 'incremental',
unique_key = 'session_id',
partition_by = { 'field': 'session_date', 'data_type': 'date' },
incremental_strategy = 'insert_overwrite'
)}}
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
-- recalculate latest day's data + previous
where date(event_timestamp) >= date_sub(_dbt_max_partition, interval 1 day)
{% endif %}
),
... rest of model ...
dbt will then generate an all-SQL script to run against BigQuery, comprised of four steps:
Step 0: declare
the “scripting” variables dbt_partitions_for_upsert and _dbt_max_partition. These variables can be set
using subsequent queries, and afterward referenced in plaintext SQL. Right away, we’ll set _dbt_max_partition
equal to the maximum value of the partition column in the currently existing table.
-- generated script to merge partitions into `my_project`.`my_dataset`.`sessions`
declare dbt_partitions_for_replacement array<date>;
declare _dbt_max_partition timestamp;
set _dbt_max_partition = (
select max(session_date) from `my_project`.`my_dataset`.`sessions`
);
Step 1: Create a temporary table of "new" records, where "new" is defined by the logic within `is_incremental()` that can leverage `_dbt_max_partition`. (BigQuery just released support for "true" `temp` tables inside of scripts. For the present moment, dbt will continue to use non-temp tables that it drops at the end of the materialization, with 12-hour expirations as a failsafe.)
-- 1. create a temp table
create or replace table `my_project`.`my_dataset`.`sessions__dbt_tmp`
partition by session_date
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
... model SQL ...
)
Step 2: set dbt_partitions_for_replacement
by selecting, as an array, all distinct values of the partition column from the table of new records.
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct session_date)
from `my_project`.`my_dataset`.`sessions`
);
Step 3: merge
the new records (“source”) into the existing table (“dest”), using the constant false predicate, and filtering on dbt_partitions_for_upsert
. The upshot: We only overwrite partitions that have new data, determined dynamically from our model query. Those partitions are replaced, atomically and in their entirety.
-- 3. run the merge statement
merge into `my_project`.`my_dataset`.`sessions` as DBT_INTERNAL_DEST
using (
select * from `my_project`.`my_dataset`.`sessions__dbt_tmp`
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and date(DBT_INTERNAL_DEST.session_date) in unnest(dbt_partitions_for_upsert)
then delete
when not matched then insert
(... all columns ...)
values
(... all columns ...)
All in all, the cost of an incremental run decreases from a full scan of the existing table to a metadata query—getting the max of the partition column—and a full scan of only the partitions with new data.
Benchmarking
Lucky for you, there’s a whole other Discourse post. Here are the the highlights:
incremental_overwrite
with static (user-supplied) partitions is always cheapest and usually fastest. This makes sense! The more you can know in advance, the more dbt can instruct BigQuery performance and the less it needs to figure out on your behalf.- Among other approaches:
merge
(default) with a cluster key is faster and cheaper at small data volumesinsert_overwrite
dynamic is always slowest, but its cost scales better than simplemerge
at larger data volumes (~100 GB)
tl;dr
- At small data volumes, your best bet is a simple
merge
into a clustered table. - As data volume scales, switch to dynamic
insert_overwrite
. You don’t have to think about it too hard. - Once you know exactly how your incremental runs should be working—always reprocessing the past three days, for instance, with solid alerting if a dbt run fails a few days in a row—put on your data engineering hat, define some static partitions, and profit.
This functionality is quite new, and we’re still figuring out how the mileage varies. We’ll be very interested to hear about your experiences. To the replies!