On the limits of incrementality

I’ve been doing a lot of work recently on projects that have large data volumes (1TB++), and this has caused me to think a lot harder about incremental models than I ever have had to previously. I wanted to catalog some of this knowledge for others, and would love to learn if others have developed approaches that I have not.

The Big Easy

The easiest use case to create an incremental model is when your underlying data comes in as an immutable event stream and your transformation on top of it does not require any windowing and does not have any late-arriving facts. Imagine an events table that you want to aggregate into a table daily_user_events that has one record per user per day. This is a simple group by with a count(*).

In this case, you simply select new events that are greater than or equal to the max day already in the table. Here’s what the code looks like:

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where event_timestamp >= (select max(event_timestamp)::date from {{this}})
  {% endif %}
)
--rest of model...

This will grab new events and transform them. Assuming you set your underlying table up correctly using a sort key or equivalent on your warehouse of choice, this will be very fast. I have in practice seen models using this strategy take 20 seconds to process where a full refresh takes 1500 seconds, for a performance improvement of 98%+. YMMV, obviously, but my point is that this incrementality scenario is the best possible case if your underlying data and required transformation supports it.

Late-Arriving Facts

Extend the above scenario to late-arriving facts—records that arrive in our source data table out of order. This is a common occurrence with event data, especially when that event data is collected on a mobile device. When mobile devices cannot connect to the internet they typically save events that they can’t send in a queue, and then batch send them all later when they reconnect.

This scenario causes problems for our above approach, because the event_timestamp will be in the past and therefore we will never pick these records up to be transformed. There are two ways to deal with this: close enough & performant or always correct & slow.

Close Enough & Performant

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where event_timestamp >= (
      select dateadd(day, -3, max(event_timestamp)::date) from {{this}}
      )
  {% endif %}
)
--rest of model...

This approach establishes a “window” of recent data to always re-transform every time the model runs. In the code above, that window is three days. Essentially, late-arriving facts have three days to arrive, after which point they are no longer eligible to be incorporated into our metrics (until, at least, we do a full-refresh).

This strategy is just what it seems: close enough. If you require perfect accuracy on every transformation, this isn’t a good strategy. But in many cases the performance-for-accuracy tradeoff makes sense given that we’re often talking about behavioral data that needs to be directionally correct, not 100% correct.

This strategy performs almost as well as the base case. It’s still an extremely limited table scan because of the where condition built on top of the sort key (or equivalent). If you can get away with this in your scenario, I highly recommend it.

Always Correct & Slow

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where session_id in (
      select session_id from {{ref('table'}}
      where event_timestamp >= 
        (select dateadd(day, -3, max(event_timestamp)::date) from {{this}})
      )
  {% endif %}
)
--rest of model...

Using this approach we’re grabbing all events associated with a given session if we’ve seen any events associated with that session since our most recent run. This will produce a completely accurate result every time the transformation is run because there’s no possibility of “missing” any events: if any event associated with a session arrives late—even if it’s a year late!—all of the events from that session will be selected and re-transformed.

However, it’s much slower. The innermost subquery is still quite fast, and pulling all session_ids is quite fast, but grabbing all events from {{ref('events')}} that have a particular session_id is very slow because it’s not using a sort key, and, really, cannot. Whereas the earlier approach could result in a 95% performance increase vs. a full-refresh, this approach is often (in my experience) closer to only a 50%. It’s still better than doing a full-refresh, but it’s not fast. Use this approach sparingly.

Slowly-Changing Dimensions

If your transformation includes slowly changing dimensions, this will have meaningful implications for your incrementalization strategy. Here’s the scenario. Imagine you have an table called streams that represents user’s streaming songs on Spotify. And one of the fields on that table is artist_name. Well, from time-to-time, artists change their name (think Prince). When a name change for an artist happens, all prior streams look like they’re associated with a completely different artist, unless you explicitly handle this in your modeling. There are two ways to handle this: dimension tables and custom incrementality logic.

Dimension tables

This is the best argument I’ve ever seen for creating star schemas instead of wide fact tables. If your fact table has an artist_id instead of an artist_name column, this entire problem goes away. You move some of the performance cost to analysis time because you’re doing the fact >> dimension join at analysis time, but as long as the cardinality of your dimensions is less than a million rows you’re generally going to be OK. On Redshift, make sure to set your dist style as all for the dimensions.

Custom incrementality logic

with streams as (
  select * from {{ref('events')}}

  {% if is_incremental() %}

    where event_timestamp >= (select max(event_timestamp)::date from {{this}})

    or artist_name in (
      --subquery that gets distinct artist names 
      --that have been updated since last run
    )
  {% endif %}
)
--rest of model...

This approach has the same performance drawbacks as the always correct & slow approach above: we’re no longer exclusively using a sort key (or equivalent) to grab new records and so the table scan will take much longer. The only reason you would generally prefer this approach over building dimension tables is if the cardinality of your dimension tables is high enough that the analysis-time join would take untenably long. This happens in some rare cases, but it’s typically rare.

Final Thoughts

Incrementality is hard. It’s easy to screw up, there are a surprising number of situations that arise requiring different design decisions, and it’s fairly hard to test. I’m hopeful that this download gives you a bit of a head start (or a sanity check) as you’re thinking through these issues on your own.

Also: I freely admit that I am not an expert on this topic—there are real data engineers out there who manage many-TB datasets who have far more experience than I do. As a result I’m sure that there’s plenty that I’m missing in this discussion. One of the patterns that I realize is common in the Airflow / Hive world that is not idiomatic to dbt is building tables explicitly partition-by-partition and running jobs explicitly for a given day. I do not fully understand if there are benefits to this approach that dbt does not take into account, or whether this approach is simply an artifact of those particular technologies and not necessarily desirable.

Would love any and all thoughts. Thanks for sticking with me :slight_smile:

26 Likes

Couple of thoughts to add to this great post!

Even the Big Easy can get Pretty Hairy if you’re combining multiple staged models into a fact table where each staged model could be quite large and potentially benefit from incrementality.
For example, your fact represents shipped orders (fct_shipments) based on a staged or base model of shipment data and you’d like to include several fields from a separate staged or base model for orders.
Your first instinct might be to select from both staged model incrementally.
However, this will quickly lead to out of sync fact tables if you’re not careful.

Let’s say orders can ship 1 or more, or several days, after the order has been placed. Let’s further say that to keep things simple and performant, we want build incremental logic based on dates (to support partition pruning etc) and not orders ids.
So, you might start with something like this:

{{
    config(
        materialized = 'incremental',
        unique_key = 'ship_date'
    )
}}
with shipments as (
  select * from {{ ref('shipments') }}
  {% if is_incremental() %}
    where ship_date >= (
      select dateadd(day, -3, current_date)
      )
  {% endif %}
),
orders as (
  select * from {{ ref('orders') }}
  {% if is_incremental() %}
    where order_date >= (
      select dateadd(day, -3, current_date)
      )
  {% endif %}
),
shipped_orders as (
-- inner join the two models
...
)

In this case, you’d only be selecting orders that were placed and shipped in the last 3 days.
Also, because we’re going to be deleting target data based on ship date, we’d be wiping out all shipments for a particular ship date, only to be replacing it with potentially partial shipment data for that date.

So, depending on your business model, you may have to either make window for your incremental date logic much wider, or forgo that part altogether and get all orders regardless of date, and then hope the inner join to shipments for the last 3 days will get you some benefits from the query optimizer.

Hope that made sense…!

5 Likes

Thank you @tristan, it’s a great post!

There’s something I don’t quite follow. I’ve implemented Close Enough & Performant in the past but I had to delete rows that are being re-transformed. Getting that code to work in DBT is definitely not pretty.

In the example you’re showing here, can you clarify how do you effectively overwrite rows without having to scan the whole destination table?

1 Like

This is a great question. I can tell you what I know. Curious to hear your thoughts.

dbt handles the deletion for you automatically, and it does it using query that scans on the unique ID that you specify in your model config. That requires a scan on the ID field, you’re absolutely right. In practice, this ends up working quite well on Redshift and Snowflake–it performs well in most cases. On Bigquery we’ve seen this scan end up costing a lot–for some reason it seems to trigger a scan of the entire destination table rather than just the single ID column. I don’t totally understand why that ends up being.

I wouldn’t recommend writing custom logic to implement this functionality in dbt–if you’re attempting to write hooks or something similar that do this stuff for you you’re really going to be fighting the framework instead of working with it. If you want dbt to handle the delete part of the materialization differently, you should override the materialization in your local project and change the implementation.

2 Likes

@tristan – Great post, thank you.

One additional thing that I would like to see further discussed here. You mentioned that incrementality is good except in cases that involve windowing. However, a classic case involving windowing is the need to deduplicate incoming data. I’ll usually write a window function to do this, partitioning over the common fields in the incoming data that would have the same values.

When calling APIs or getting any sort of event stream data, normally an ETL architect has two choices.

  1. At-least once: If there is any problem with an API call, batch, workflow, or anything along the way, the given data or set of data will be reprocessed and in some cases may be re-sent to the target (which is likely your raw/staging area in the database which dbt will then consult as Sources.) Meaning that incoming data may be duplicated on import but this is an acceptable price to pay for ensuring that incoming data is for sure (for some practical and limited definition of sure) going to arrive even if there is a problem along the way. Usually used in distributed systems where having fully reliable and incoming data is more of a priority than performance and overhead of extra validation and deduplication of data at the target.
  2. At-most once: Meaning that incoming data may be dropped along the way if there is a problem somewhere. Hopefully in practice this happens rarely but still possible. Usually used in systems where directional accuracy is more important than 100% reliability and the team wants to avoid the extra complexity, delay, and performance overhead involved in issue detection and deduplication.

More info on at-least-once versus at-most once here: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

In practice, I have almost always used “at-least-once” semantics. Getting users to agree to “at-most-once” is hard because their very logical question in response will be “Tell me how much data I am going to lose and when?” to which there is no great response. You can’t know that until you actually get all of the data, validate it, and thoroughly verify against the source where the data is coming from, and by the time you have done that you have probably already built an at-least-once system.

So, if we are often building systems that are at-least-once, and therefore requiring incoming data to be deduplicated, and the easiest way to do this is windowing, doesn’t that mean that in most cases this whole discussion about performant incrementality is moot because we have to do windowing?

If others are mostly building at-most-once systems where windowing and/or deduplication is not required then I would be interested to hear about it. My experience is generally “at-least-once” and requiring windowing.

I’ve always thought that partitioning should be a feature of dbt. The nice thing about it is that you can compose models that are idempotent, which is preferable to an incremental strategy imho (for reasons explained below).

We use the following pattern with a lot of models at my job (it’s the main reason why I put some time into adding the alias feature). It directly violates the dbt maxim of “one model, one table” but I’ll try and explain how it addresses the issues you outline in your post.

events_monthly.sql

-- set runtime variables
{% set year = var('year', run_started_at.strftime('%Y')) %}
{% set month = var('month', run_started_at.strftime('%m')) %}

{{
  config({
    'alias': '_tracking_' + year|string + '_' + month|string,
    'schema': 'events'
  })
}}

SELECT 
  * 
FROM events
WHERE
    date_part('month', "time") = {{ month }}
    AND date_part('year', "time") = {{ year }} 

events.sql

{{
  config({
    'alias': 'tracking',
    'schema':'events',
    'materialized':'view'
  })
}}

-- set upstream dependency on monthly model
--  {{ ref('events_monthly') }}
{{ union_by_prefix('events', '_tracking_') }}

Here, we have two models: events_monthly.sql which builds a table for a discrete monthly partition and events.sql, which builds a view of all tables that match a common prefix. The heavy lifting is done by a macro, union_by_prefix, which queries the database to get a list of all tables with a common pattern (in this case, _tracking_) and UNIONS them together.

By default, the monthly model will build the current month’s table, but you can also pass in vars at runtime to build a previous month’s table. This is nice because you can address the challenge of “Late-Arriving Facts” by also regularly building models from previous months. This is an “eventually consistent” approach. Performance of the monthly models can be tuned by increasing / decreasing the size of the partitions (years, quarters, weeks, days, hours, minutes, etc). Previous partitions can be programmatically generated using this script I wrote: Script for backfilling DBT models which use partitions · GitHub

Downstream, other models only reference events.sql such that, to the analysts that consume these models, there’s really only one events model. However, the challenge with this approach is isolating production and development environments. Ideally, an analyst shouldn’t need to build all monthly partitions in order to work with the events model. For that reason, we hardcode the production schema in events.sql. This means that when you call {{ ref('events') }} it will only build a view of the monthly partitions in your development schema. Obviously, this is not ideal, but we’ve thoroughly explained this behavior to all dbt users, so they’re aware of what’s going on.

How could we bring this strategy into the dbt paradigm without breaking some of its core tenets?

7 Likes

Thanks for the quick reply @tristan.
As you guessed, instead of declaring a unique_key on the model, I used a prehook with a DELETE statement in order to get a “truly” incremental update where the table is not scanned.
Maybe this scan isn’t as expensive as I thought (we’re on Redshift), so I’ll try removing that custom logic to see how well it does.
And writing a custom materialization strategy sounds a bit scary but I’ll try to give it a shot next time I have to do something like this.

:exploding_head::exploding_head::exploding_head:

@abelsonlive Thanks for your detailed explanation here! I really like this a lot–it’s really the first time I’ve gotten my head around the benefits here. Things I’m curious about:

  • if you’re locating all of this data into separate tables, how are you performing windowing across all of the tables? for instance, how would you create a user_session_number field?
  • how does the optimizer treat the unioned view? does it understand how to only look at some of the underlying tables depending on the date range? assuming that all the underlying tables are partitioned appropriately, my guess is that the optimizer would be OK here but I’m curious what you’ve experienced.
  • is the mechanism you’re describing just the best way to do this thing on Redshift or do you think it might be the best way to do the thing you’re describing across each warehouse?

I don’t have any answers for you in terms of how to make this more dbt-idiomatic. I could imagine ways of achieving that, but honestly this is currently more of a personal curiosity than a burning need from within the community. We just aren’t hearing from a lot of people that this kind of functionality is a priority for them right now. If there are others out there for whom this is a big problem, I’d love to hear from you!

Thanks again for taking the time to lay all this out. I may want to follow up with you offline at some point to discuss.

2 Likes

I concur with @abelsonlive re partitions.

This article lays out the benefits of this whole notion of partitions and idempotent pipelines: “Functional Data Engineering — a modern paradigm for batch data processing” by Maxime Beauchemin https://link.medium.com/q4dWH2pFdY

4 Likes

My apologies on reviving an old topic, but I wanted to share my learnings about optimizing BigQuery incremental partitioned tables - specifically about @tristan’s comment where BigQuery ends up costing a lot since it scans the entire destination table. For us, this resulted in incremental runs still being faster, but costing more than generating the table from scratch since we were billed not only for reading the incremental data, but re-reading the entire destination table.

While there’s an idiomatic pattern to add predicate filters on the partitioned column of the tables you’re reading from in dbt, there is no such way to do the same for the destination table you’re merging into. When dbt writes the merge statement, it follows the typical pattern:

merge {target}
using {source}
on {target}.id = {source}.id
when not matched then update ...
when not matched then insert ...

The issue is BigQuery cannot perform partition pruning on the destination table since it needs to scan the entire table for matching ids. If for example you partition on created_at, and can guarantee that you only need to update matching data from the past 3 days, you can change the statement into this:

merge {target}
using {source}
on {target}.created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY)
  and {target}.id = {source}.id
when not matched then update ...
when not matched then insert ...

This will give you the runtime and cost savings you would expect. For us, this was the difference of a single table being billed at 34GB vs 600MB. As we applied this strategy to dozens of our incremental tables, the total cost savings were much more.

You can have dbt do this by patching your merge function in your project. Create a merge.sql file in your macros folder with the following contents:

{#
  Fork of common_get_merge_sql from the dbt source code that adds "destination_predicate_filter" config support.
  This optimizes billing for incremental tables so we can limit the upsert searching logic to a few recent partitions.
  This is the difference of doing a fully billed table scan of the destination table, vs only being billed for reading
  a few days of the destination table.
#}
{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns) %}
    {%- set dest_cols_csv =  get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set destination_predicate_filter = config.get('destination_predicate_filter') -%}

    merge into {{ target }} as DBT_INTERNAL_DEST
    using {{ source }} as DBT_INTERNAL_SOURCE

    {% if unique_key %}
      on
        {% if destination_predicate_filter %} DBT_INTERNAL_DEST.{{destination_predicate_filter}} and {% endif %}
        DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
    {% else %}
        on FALSE
    {% endif %}

    {% if unique_key %}
    when matched then update set
        {% for column in dest_columns -%}
            {{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
    {% endif %}

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({{ dest_cols_csv }})

{%- endmacro %}

And then in your incremental models, you can update your config accordingly:

{{
  config(
    materialized = "incremental",
    partition_by = "date(created_at)",
    unique_key = "id",
    destination_predicate_filter = "created_at >= timestamp_sub(current_timestamp(), interval 3 day)"
  )
}}

I feel like the API I proposed above leaves something to be desired so I didn’t open a pull request, but if there’s interest I can submit one. Either way though, hopefully this helps other teams trying to optimize their BigQuery bill with incremental tables.

8 Likes

@joe this is so great!! Thanks for contributing this back to the community.

@joe Very nice! Your proposal is similar to an initial approach we developed internally when we ran our first big incremental models on BigQuery. It’s simple, elegant, and definitely cost-effective—runs that had been billing us $50-a-pop fell to less than $1.

There’s been some great back-and-forth from members of the community over the past several months on how to generalize this solution. It’s culminated in a big proposed change to BigQuery’s incremental materialization and partition_by config, and it’s forthcoming in the next minor version of dbt. Check out the PR.

In summary: dbt will use BigQuery’s new scripting feature to create a temp table of new records, grab all partition values from the temp table, and merge into the existing table on those partitions only. There’s support for all partition column types (date, timestamp, and the new integer range), and we don’t need to add any configs or hard-code the merge window.

I plan to write more about this when we release it in dbt!

2 Likes

Hi @tristan
Just a small contribution on how we do it.

We have snowflake and we load events using snowpipe and apart from maintaining event_timestamp (time when the event occured) we also preserve “event_load_timestamp” ( time when the event was loaded into dw) and base our incremental models on event_load_timestamp instead.
So this covers for any late arriving events as well without putting any additional window as such.

Hope it makes sense.

/Manish

3 Likes

You could also use modelling approach that tracks records and their place that can be used to help correct time lines when data does arrive out of sequence.

Regarding Always Correct & Slow approach for the late arriving fact, I found it’s contradictory between the above code and " Using this approach we’re grabbing all events associated with a given session if we’ve seen any events associated with that session since our most recent run."
From my understanding, the above code says grabbing all session_id from previous 3 days from the max event_timestamp.

This is very interesting, I also think partitions should be part of dbt core. When looking at multiple ways to view data , partitions play an important role… for eg., in Kafka you could partition the same topic multiple ways depending on how you join your topics downstream… interesting part is that partitions define scaling, giving power to the end user… if partition is by year making query runs slow, then i partition by month and re-run. If i partition by any other entity than time, your scaling is different

1 Like

Hello,
Sorry for reviving this old interesting thread.

Why relying on a business timestamp to manage incremental?
If we are relying on an mechanical timestamp, like the loading timestamp , then even late arrival events will be taken and applied correctly during the incremental transformation.

{{
    config(
        materialized = 'incremental',
        unique_key = 'event_id'
    )
}}
with events as (
 -- event_timestamp will always be < loading_timestamp
  select event_timestamp, loading_timestamp , * from {{ref('events')}}
  {% if is_incremental() %}
    where loading_timestamp >= (select max(loading_timestamp)::date from {{this}})
  {% endif %}
),

{{ deduplicate("partition by event_id", "order by event_timestamp desc, loading_timestamp desc" ),

--rest of model...
1 Like

A post was split to a new topic: Incremental loads of unioned sources