BigQuery + dbt: Incremental Changes

Summertime is over

Since dbt version 0.10.1, if you were running dbt models on BigQuery that were both incremental and partitioned on a date or timestamp column, you were living a pretty good life:

  • dbt had handy macros to conditionally switch between “all records” and “only new records”, eventually rolled into the sugary is_incremental().
  • So what if the boilerplate filter syntax (where updated_at ≥ (select max from {{this}})) didn’t play well with BQ’s penny-pinching optimizer? You could supply a static, relative, var-based, or—with the help of statement calls—still-quite-dynamic filter that, in incremental runs, significantly limited the amount of data scanned for transformation. Those model builds could be fast, an order of magnitude faster than the --full-refresh version.
  • To finish it off, dbt would handle the reconciliation of those transformed new records with the existing database table, leveraging BigQuery’s awesome atomic merge operation to ensure that the model’s unique_key really was unique. (More than that—BQ would return an error if it wasn’t.)

So it’s the summertime of 2018, and the living is easy. Summer becomes fall, and one day, at 4:48 pm, you get this message:

hey @jerco, soooo… all our efforts to make sure the queries to load GA data were incrementalized, so as to only select a few days of data have ended up not actually being effective from a cost standpoint, because the merge into part forces it to access the entire table, causing each load of intermediate.ga_web_sessions to access 8.45 TB of data. That’s a $42 query that we run 12 times a day at present. Yikes. Any ideas?

Cool, you say. You open an issue. You staunch the wound, overriding dbt’s incremental materialization for BigQuery with some janky custom logic. You start breathing again.

Over the next fourteen months, as the dbt community grew, every now and again someone new would chime into the #bigquery channel with the same question: Why does dbt do a full-table scan every time you run an incremental model, even when that model is partitioned? Even when dbt knows that model is partitioned, because dbt is the one who partitioned it?

For a long time, we didn’t have a good, generalizable answer to this. One developed slowly, with contributions and input from several dedicated members of the dbt + BigQuery community.

dbt is 0.15, going on 0.16

In version 0.16.0, we’re making serious-and-smart changes to the way that dbt knows about and uses the partition_by model config on BigQuery.

partition_by = {}

First, an across-the-board change: **partition_by is now a dictionary**. So if you have a model that you want to materialize as a table partitioned on a timestamp column created_at, previously you would write something like:

{{config(
	materialized = 'table',
	partition_by = 'date(created_at)'
)}}

Starting in 0.16.0, you’ll instead write:

{{config(
    materialized = 'table',
    partition_by = { 'field': 'created_at', 'data_type': 'timestamp' }
)}}

There are a couple of good reasons for this, the biggest of them being that BigQuery recently rolled out integer range partitioned tables:

{{config(
    materialized = 'table',
    partition_by = {
        'field': 'customer_id',
        'data_type': int64,
        'range': {
            'end': 100,
            'interval': 10,
            'start': 0
        }
    }
)}}

This directly maps to what you’d see if you were to run bq show:

...
  "rangePartitioning": {
    "field": "customer_id",
    "range": {
      "end": "100",
      "interval": "10",
      "start": "0"
    }
  },
...

This doesn’t mean you need to worry about all your existing models partitioned on a date or timestamp column. dbt will do its best to infer whether your partition column is a date (default) or timestamp (if you’ve wrapped the column name in some version of date()).

Incremental strategies

The simple, tried-and-true way dbt runs incremental models on BigQuery will be sticking around as the merge incremental strategy, and it will remain the default. Thanks for some last-minute sleuthing and help from @clausherther, we’ve observed, tested, and validated that adding a cluster key to an incremental model improves merge performance. To my knowledge, this is relatively undocumented behavior, and possibly new within the past six months. We’re always seeking to better understand the more opaque features of BigQuery, and clustering is certainly one of them.

For bigger datasets, we’re also rolling out a new, highly performant insert_overwrite strategy. For those familiar, it’s analogous to the Spark functionality by the same name:

  • The materialization will not use the unique_key config to match records. Instead, it asks BigQuery to drop and replace entire partitions as discrete, atomic units.
  • There is an optional partitions config and helpful datetime macros. If supplied, the materialization will use these exact static partition values for replacement in the DML it writes. If not supplied, the materialization will instead execute a multi-query script to dynamically determine which partitions to replace.
  • In both cases, the goal is to limit BigQuery’s scan of the existing table to only the partitions requiring replacement, avoiding a full-table scan of the merge target.
  • As in Spark: **“Be sure to re-select all of the relevant data for a partition when using incremental models.”

Static

Let’s say we have an incremental model, sessions, that queries an events table. We’re going to use the partitions config:

{% partitions_to_replace = [
    'current_date',
    'date_sub(current_date, interval 1 day)'
] %}

{{config(
    materialized = 'incremental',
    partition_by = { 'field': 'session_start', 'data_type': 'timestamp' },
    incremental_strategy = 'insert_overwrite',
    partitions = partitions_to_replace
)}}

with events as (

    select * from {{ref('events')}}

    {% if is_incremental() %}
		
    -- recalculate yesterday + today
    where date(event_timestamp) in ({{ partitions_to_replace | join(',') }})
		
    {% endif %}

),

... rest of model ...

If we wanted this to run more dynamically—let’s say, always for the past 3 days—we could leverage dbt’s baked-in datetime macros and write a few of our own.

The insert_overwrite strategy still runs a merge statement, since it’s the best way to perform atomic DML in BigQuery, though now using a constant false predicate. From BigQuery’s docs:

If the merge_condition is FALSE, the query optimizer avoids using a JOIN. This optimization is referred to as a constant false predicate. A constant false predicate is useful when you perform an atomic DELETE on the target plus an INSERT from a source (DELETE with INSERT is also known as a REPLACE operation).

merge into `my_project`.`my_dataset`.`sessions` as DBT_INTERNAL_DEST
        using (
		       ... model SQL ...
        ) as DBT_INTERNAL_SOURCE
        on FALSE
    
    when not matched by source
         and date(DBT_INTERNAL_DEST.date_hour) in (
                current_date,
                date_sub(current_date, interval 1 day)
          ) 
        then delete

    when not matched then insert
        (... columns ...)
    values
        (... columns ...)

Dynamic

Or you can leverage the materialization script to execute dynamic partition replacement, taking advantage of the calculated _dbt_max_partition variable:

{{config(
    materialized = 'incremental',
    unique_key = 'session_id',
    partition_by = { 'field': 'session_start', 'data_type': 'timestamp' },
    incremental_strategy = 'insert_overwrite'
)}}

with events as (

	select * from {{ref('events')}}

	{% if is_incremental() %}

	-- recalculate latest day's data + previous
	where date(event_timestamp) >= date_sub(date(_dbt_max_partition, interval 1 day)

	{% endif %}

),

... rest of model ...

dbt will then generate an all-SQL script to run against BigQuery, comprised of four steps:

Step 0: declare the “scripting” variables dbt_partitions_for_upsert and _dbt_max_partition. These variables can be set using subsequent queries, and afterward referenced in plaintext SQL. Right away, we’ll set _dbt_max_partition equal to the maximum value of the partition column in the currently existing table.

-- generated script to merge partitions into `my_project`.`my_dataset`.`sessions`
  declare dbt_partitions_for_replacement array<date>;
  declare _dbt_max_partition timestamp;

  set _dbt_max_partition = (
      select max(session_start) from `my_project`.`my_dataset`.`sessions`
  );

Step 1: Create a temporary table of "new" records, where "new" is defined by the logic within `is_incremental()` that can leverage `_dbt_max_partition`. (BigQuery just released support for "true" `temp` tables inside of scripts. For the present moment, dbt will continue to use non-temp tables that it drops at the end of the materialization, with 12-hour expirations as a failsafe.)

-- 1. create a temp table
  

  create or replace table `my_project`.`my_dataset`.`sessions__dbt_tmp`
  partition by date(session_start)
  
  OPTIONS(
      expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
    )
  as (

		... model SQL ...

	)

Step 2: set dbt_partitions_for_replacement by selecting, as an array, all distinct values of the partition column from the table of new records.

-- 2. define partitions to update

  set (dbt_partitions_for_replacement) = (
      select as struct
          array_agg(distinct date(max_tstamp))
      from `my_project`.`my_dataset`.`sessions`
  );

Step 3: merge the new records (“source”) into the existing table (“dest”) on the unique_key, filtering the existing table on only those

-- 3. run the merge statement
    	
	merge into `my_project`.`my_dataset`.`sessions` as DBT_INTERNAL_DEST
	        using (
	    select * from `my_project`.`my_dataset`.`sessions__dbt_tmp`
	  ) as DBT_INTERNAL_SOURCE
	        on FALSE
	
	when not matched by source
		and date(DBT_INTERNAL_DEST.session_start) in unnest(dbt_partitions_for_upsert) 
		then delete
	
	when not matched then insert
	        (... all columns ...)
	    values
	        (... all columns ...)

All in all, the cost of an incremental run decreases from a full scan of the existing table to a metadata query—getting the max of the partition column—and a full scan of only the partitions with new data.

Benchmarking

Lucky for you, there’s a whole other Discourse post. Here are the the highlights:

  • incremental_overwrite with static (user-supplied) partitions is always cheapest and usually fastest. This makes sense! The more you can know in advance, the more dbt can instruct BigQuery performance and the less it needs to figure out on your behalf.
  • Among other approaches:
    • merge (default) with a cluster key is faster and cheaper at small data volumes
    • insert_overwrite dynamic is always slowest, but its cost scales better than simple merge at larger data volumes (~100 GB)

tl;dr

  • At small data volumes, your best bet is a simple merge into a clustered table.
  • As data volume scales, switch to dynamic insert_overwrite. You don’t have to think about it too hard.
  • Once you know exactly how your incremental runs should be working—always reprocessing the past three days, for instance, with solid alerting if a dbt run fails a few days in a row—put on your data engineering hat, define some static partitions, and profit.

This functionality is quite new, and we’re still figuring out how the mileage varies. We’ll be very interested to hear about your experiences. To the replies!

9 Likes

Can _dbt_max_partition be used with the standard MERGE strategy? My use case is that I have a source table that gets truncated and loaded with new data each day to be merged into my destination table. I’m only going to have inserts but sometimes there’s going to be records from several days ago getting inserted as well as yesterdays data. I’d like to be able to use _dbt_max_partition so that if the model is accidentally run twice it would update existing records. Ensuring that data isn’t inserted twice and still take advantage of only scanning partitions in the destination that exist in the source table, avoiding scanning all 10+ TB in the destination.

Currently I’m using a macro to override the MERGE behavior in 0.15.* to take into account a destination_predicate_filter where I’m specifying some interval of days back to pull from the destination table but would prefer it to be more dynamic to what partitions the source data would touch in the destination.

My override works fine for now but I’d prefer to use standard DBT behavior instead of hacking my own.

Thanks for the detailed question! As I understand it, the reason you wouldn’t want to leverage the insert_overwrite strategy is because

sometimes there’s going to be records from several days ago getting inserted as well as yesterdays data

If you used the dynamic insert_overwrite strategy, you’d risk replacing a full partition of preexisting data with a only handful of new records.

I have two thoughts:

  1. It sounds like you may want something in between simple merge and dynamic insert_overwrite: a strategy that dynamically calculates which partitions to be merging into, and then performs the merge with a unique_key, rather than replacing the entire partition. The code for that is definitely possible, though slower than insert_overwrite. If this is a common problem that folks run into, we may want to add support for it as a new incremental strategy in a future release.

  2. How many late-arriving records do you have? Depending on the data source, we often find that the proportion of late-arriving facts (e.g. web/mobile events) is low enough that we can ignore any records which arrive more than 3 days late during incremental runs, and fold them in during weekly or monthly full refreshes. If you’ve got 10+ TB of source data, that $50+ full-refresh build feels expensive, but paying for it once a week or once a month is far cheaper than trying to manage more complex code to handle 0.001% of your edge cases. Plus, if you’re always running static insert_overwrite jobs for the prior 3 days, you save enough in both time and money to more than make up for the occasional full rebuild.

Yep that’s exactly my problem. Thanks for the recommendations!

These new features sound like exactly what I need - thank you! One question remains, though: I have been following the relevant pull requests for the past few weeks and I am not quite clear on wether the cost for the old merge strategy has improved or not.
From the pull request discussions I got the impression that with 0.16 only the modified partitions of the destination table are scanned, even with the ‘merge’ strategy. But this article and the DBT 0.16 docs seem to suggest that the merge strategy still performs a full destination scan. Can you clarify this for me?

Glad to hear it, @hduden! And thanks for the question. There has been a lot of back and forth on which strategy dbt should use by default, and some significant changes among even the 0.16.0 release candidates.

To be clear, what we ended up implementing doesn’t change dbt’s default behavior at all for incremental models. The default incremental strategy is one we’re now calling merge, but it’s the same as it ever was—it will still scan the full destination table, which is a low cost for some models and a huge cost for others. What we also found (and backed up with benchmarking) is that merge will not scan the full destination table if that table is clustered on any column, for reasons that are a little bit mysterious and a little bit magical.

If you want to take advantage of the net-new functionality in 0.16.0, which only scans and replaces the destination partitions that have new records, try out the insert_overwrite incremental strategy instead.

Thank you @jerco for the concise explanation. It looks like I will be switching some models to insert_overwrite.

Interesting how partition wise incremental strategy is in phase with :