On the limits of incrementality

#1

I’ve been doing a lot of work recently on projects that have large data volumes (1TB++), and this has caused me to think a lot harder about incremental models than I ever have had to previously. I wanted to catalog some of this knowledge for others, and would love to learn if others have developed approaches that I have not.

The Big Easy

The easiest use case to create an incremental model is when your underlying data comes in as an immutable event stream and your transformation on top of it does not require any windowing and does not have any late-arriving facts. Imagine an events table that you want to aggregate into a table daily_user_events that has one record per user per day. This is a simple group by with a count(*).

In this case, you simply select new events that are greater than or equal to the max day already in the table. Here’s what the code looks like:

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where event_timestamp >= (select max(event_timestamp)::date from {{this}})
  {% endif %}
)
--rest of model...

This will grab new events and transform them. Assuming you set your underlying table up correctly using a sort key or equivalent on your warehouse of choice, this will be very fast. I have in practice seen models using this strategy take 20 seconds to process where a full refresh takes 1500 seconds, for a performance improvement of 98%+. YMMV, obviously, but my point is that this incrementality scenario is the best possible case if your underlying data and required transformation supports it.

Late-Arriving Facts

Extend the above scenario to late-arriving facts—records that arrive in our source data table out of order. This is a common occurrence with event data, especially when that event data is collected on a mobile device. When mobile devices cannot connect to the internet they typically save events that they can’t send in a queue, and then batch send them all later when they reconnect.

This scenario causes problems for our above approach, because the event_timestamp will be in the past and therefore we will never pick these records up to be transformed. There are two ways to deal with this: close enough & performant or always correct & slow.

Close Enough & Performant

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where event_timestamp >= (
      select dateadd(day, -3, max(event_timestamp)::date) from {{this}}
      )
  {% endif %}
)
--rest of model...

This approach establishes a “window” of recent data to always re-transform every time the model runs. In the code above, that window is three days. Essentially, late-arriving facts have three days to arrive, after which point they are no longer eligible to be incorporated into our metrics (until, at least, we do a full-refresh).

This strategy is just what it seems: close enough. If you require perfect accuracy on every transformation, this isn’t a good strategy. But in many cases the performance-for-accuracy tradeoff makes sense given that we’re often talking about behavioral data that needs to be directionally correct, not 100% correct.

This strategy performs almost as well as the base case. It’s still an extremely limited table scan because of the where condition built on top of the sort key (or equivalent). If you can get away with this in your scenario, I highly recommend it.

Always Correct & Slow

with events as (
  select * from {{ref('events')}}
  {% if is_incremental() %}
    where session_id in (
      select session_id from {{ref('table'}}
      where event_timestamp >= 
        (select dateadd(day, -3, max(event_timestamp)::date) from {{this}})
      )
  {% endif %}
)
--rest of model...

Using this approach we’re grabbing all events associated with a given session if we’ve seen any events associated with that session since our most recent run. This will produce a completely accurate result every time the transformation is run because there’s no possibility of “missing” any events: if any event associated with a session arrives late—even if it’s a year late!—all of the events from that session will be selected and re-transformed.

However, it’s much slower. The innermost subquery is still quite fast, and pulling all session_ids is quite fast, but grabbing all events from {{ref('events')}} that have a particular session_id is very slow because it’s not using a sort key, and, really, cannot. Whereas the earlier approach could result in a 95% performance increase vs. a full-refresh, this approach is often (in my experience) closer to only a 50%. It’s still better than doing a full-refresh, but it’s not fast. Use this approach sparingly.

Slowly-Changing Dimensions

If your transformation includes slowly changing dimensions, this will have meaningful implications for your incrementalization strategy. Here’s the scenario. Imagine you have an table called streams that represents user’s streaming songs on Spotify. And one of the fields on that table is artist_name. Well, from time-to-time, artists change their name (think Prince). When a name change for an artist happens, all prior streams look like they’re associated with a completely different artist, unless you explicitly handle this in your modeling. There are two ways to handle this: dimension tables and custom incrementality logic.

Dimension tables

This is the best argument I’ve ever seen for creating star schemas instead of wide fact tables. If your fact table has an artist_id instead of an artist_name column, this entire problem goes away. You move some of the performance cost to analysis time because you’re doing the fact >> dimension join at analysis time, but as long as the cardinality of your dimensions is less than a million rows you’re generally going to be OK. On Redshift, make sure to set your dist style as all for the dimensions.

Custom incrementality logic

with streams as (
  select * from {{ref('events')}}

  {% if is_incremental() %}

    where event_timestamp >= (select max(event_timestamp)::date from {{this}})

    or artist_name in (
      --subquery that gets distinct artist names 
      --that have been updated since last run
    )
  {% endif %}
)
--rest of model...

This approach has the same performance drawbacks as the always correct & slow approach above: we’re no longer exclusively using a sort key (or equivalent) to grab new records and so the table scan will take much longer. The only reason you would generally prefer this approach over building dimension tables is if the cardinality of your dimension tables is high enough that the analysis-time join would take untenably long. This happens in some rare cases, but it’s typically rare.

Final Thoughts

Incrementality is hard. It’s easy to screw up, there are a surprising number of situations that arise requiring different design decisions, and it’s fairly hard to test. I’m hopeful that this download gives you a bit of a head start (or a sanity check) as you’re thinking through these issues on your own.

Also: I freely admit that I am not an expert on this topic—there are real data engineers out there who manage many-TB datasets who have far more experience than I do. As a result I’m sure that there’s plenty that I’m missing in this discussion. One of the patterns that I realize is common in the Airflow / Hive world that is not idiomatic to dbt is building tables explicitly partition-by-partition and running jobs explicitly for a given day. I do not fully understand if there are benefits to this approach that dbt does not take into account, or whether this approach is simply an artifact of those particular technologies and not necessarily desirable.

Would love any and all thoughts. Thanks for sticking with me :slight_smile:

5 Likes
#2

Couple of thoughts to add to this great post!

Even the Big Easy can get Pretty Hairy if you’re combining multiple staged models into a fact table where each staged model could be quite large and potentially benefit from incrementality.
For example, your fact represents shipped orders (fct_shipments) based on a staged or base model of shipment data and you’d like to include several fields from a separate staged or base model for orders.
Your first instinct might be to select from both staged model incrementally.
However, this will quickly lead to out of sync fact tables if you’re not careful.

Let’s say orders can ship 1 or more, or several days, after the order has been placed. Let’s further say that to keep things simple and performant, we want build incremental logic based on dates (to support partition pruning etc) and not orders ids.
So, you might start with something like this:

{{
    config(
        materialized = 'incremental',
        unique_key = 'ship_date'
    )
}}
with shipments as (
  select * from {{ ref('shipments') }}
  {% if is_incremental() %}
    where ship_date >= (
      select dateadd(day, -3, current_date)
      )
  {% endif %}
),
orders as (
  select * from {{ ref('orders') }}
  {% if is_incremental() %}
    where order_date >= (
      select dateadd(day, -3, current_date)
      )
  {% endif %}
),
shipped_orders as (
-- inner join the two models
...
)

In this case, you’d only be selecting orders that were placed and shipped in the last 3 days.
Also, because we’re going to be deleting target data based on ship date, we’d be wiping out all shipments for a particular ship date, only to be replacing it with potentially partial shipment data for that date.

So, depending on your business model, you may have to either make window for your incremental date logic much wider, or forgo that part altogether and get all orders regardless of date, and then hope the inner join to shipments for the last 3 days will get you some benefits from the query optimizer.

Hope that made sense…!

2 Likes
#3

Thank you @tristan, it’s a great post!

There’s something I don’t quite follow. I’ve implemented Close Enough & Performant in the past but I had to delete rows that are being re-transformed. Getting that code to work in DBT is definitely not pretty.

In the example you’re showing here, can you clarify how do you effectively overwrite rows without having to scan the whole destination table?

#4

This is a great question. I can tell you what I know. Curious to hear your thoughts.

dbt handles the deletion for you automatically, and it does it using query that scans on the unique ID that you specify in your model config. That requires a scan on the ID field, you’re absolutely right. In practice, this ends up working quite well on Redshift and Snowflake–it performs well in most cases. On Bigquery we’ve seen this scan end up costing a lot–for some reason it seems to trigger a scan of the entire destination table rather than just the single ID column. I don’t totally understand why that ends up being.

I wouldn’t recommend writing custom logic to implement this functionality in dbt–if you’re attempting to write hooks or something similar that do this stuff for you you’re really going to be fighting the framework instead of working with it. If you want dbt to handle the delete part of the materialization differently, you should override the materialization in your local project and change the implementation.

#5

@tristan – Great post, thank you.

One additional thing that I would like to see further discussed here. You mentioned that incrementality is good except in cases that involve windowing. However, a classic case involving windowing is the need to deduplicate incoming data. I’ll usually write a window function to do this, partitioning over the common fields in the incoming data that would have the same values.

When calling APIs or getting any sort of event stream data, normally an ETL architect has two choices.

  1. At-least once: If there is any problem with an API call, batch, workflow, or anything along the way, the given data or set of data will be reprocessed and in some cases may be re-sent to the target (which is likely your raw/staging area in the database which dbt will then consult as Sources.) Meaning that incoming data may be duplicated on import but this is an acceptable price to pay for ensuring that incoming data is for sure (for some practical and limited definition of sure) going to arrive even if there is a problem along the way. Usually used in distributed systems where having fully reliable and incoming data is more of a priority than performance and overhead of extra validation and deduplication of data at the target.
  2. At-most once: Meaning that incoming data may be dropped along the way if there is a problem somewhere. Hopefully in practice this happens rarely but still possible. Usually used in systems where directional accuracy is more important than 100% reliability and the team wants to avoid the extra complexity, delay, and performance overhead involved in issue detection and deduplication.

More info on at-least-once versus at-most once here: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

In practice, I have almost always used “at-least-once” semantics. Getting users to agree to “at-most-once” is hard because their very logical question in response will be “Tell me how much data I am going to lose and when?” to which there is no great response. You can’t know that until you actually get all of the data, validate it, and thoroughly verify against the source where the data is coming from, and by the time you have done that you have probably already built an at-least-once system.

So, if we are often building systems that are at-least-once, and therefore requiring incoming data to be deduplicated, and the easiest way to do this is windowing, doesn’t that mean that in most cases this whole discussion about performant incrementality is moot because we have to do windowing?

If others are mostly building at-most-once systems where windowing and/or deduplication is not required then I would be interested to hear about it. My experience is generally “at-least-once” and requiring windowing.

#6

I’ve always thought that partitioning should be a feature of dbt. The nice thing about it is that you can compose models that are idempotent, which is preferable to an incremental strategy imho (for reasons explained below).

We use the following pattern with a lot of models at my job (it’s the main reason why I put some time into adding the alias feature). It directly violates the dbt maxim of “one model, one table” but I’ll try and explain how it addresses the issues you outline in your post.

events_monthly.sql

-- set runtime variables
{% set year = var('year', run_started_at.strftime('%Y')) %}
{% set month = var('month', run_started_at.strftime('%m')) %}

{{
  config({
    'alias': '_tracking_' + year|string + '_' + month|string,
    'schema': 'events'
  })
}}

SELECT 
  * 
FROM events
WHERE
    date_part('month', "time") = {{ month }}
    AND date_part('year', "time") = {{ year }} 

events.sql

{{
  config({
    'alias': 'tracking',
    'schema':'events',
    'materialized':'view'
  })
}}

-- set upstream dependency on monthly model
--  {{ ref('events_monthly') }}
{{ union_by_prefix('events', '_tracking_') }}

Here, we have two models: events_monthly.sql which builds a table for a discrete monthly partition and events.sql, which builds a view of all tables that match a common prefix. The heavy lifting is done by a macro, union_by_prefix, which queries the database to get a list of all tables with a common pattern (in this case, _tracking_) and UNIONS them together.

By default, the monthly model will build the current month’s table, but you can also pass in vars at runtime to build a previous month’s table. This is nice because you can address the challenge of “Late-Arriving Facts” by also regularly building models from previous months. This is an “eventually consistent” approach. Performance of the monthly models can be tuned by increasing / decreasing the size of the partitions (years, quarters, weeks, days, hours, minutes, etc). Previous partitions can be programmatically generated using this script I wrote: https://gist.github.com/abelsonlive/16611a745cace973a0c9a6f3b2b6000b

Downstream, other models only reference events.sql such that, to the analysts that consume these models, there’s really only one events model. However, the challenge with this approach is isolating production and development environments. Ideally, an analyst shouldn’t need to build all monthly partitions in order to work with the events model. For that reason, we hardcode the production schema in events.sql. This means that when you call {{ ref('events') }} it will only build a view of the monthly partitions in your development schema. Obviously, this is not ideal, but we’ve thoroughly explained this behavior to all dbt users, so they’re aware of what’s going on.

How could we bring this strategy into the dbt paradigm without breaking some of its core tenets?

#7

Thanks for the quick reply @tristan.
As you guessed, instead of declaring a unique_key on the model, I used a prehook with a DELETE statement in order to get a “truly” incremental update where the table is not scanned.
Maybe this scan isn’t as expensive as I thought (we’re on Redshift), so I’ll try removing that custom logic to see how well it does.
And writing a custom materialization strategy sounds a bit scary but I’ll try to give it a shot next time I have to do something like this.