I’ve been doing a lot of work recently on projects that have large data volumes (1TB++), and this has caused me to think a lot harder about incremental models than I ever have had to previously. I wanted to catalog some of this knowledge for others, and would love to learn if others have developed approaches that I have not.
The Big Easy
The easiest use case to create an incremental model is when your underlying data comes in as an immutable event stream and your transformation on top of it does not require any windowing and does not have any late-arriving facts. Imagine an events
table that you want to aggregate into a table daily_user_events
that has one record per user per day. This is a simple group by with a count(*)
.
In this case, you simply select new events that are greater than or equal to the max day already in the table. Here’s what the code looks like:
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
where event_timestamp >= (select max(event_timestamp)::date from {{this}})
{% endif %}
)
--rest of model...
This will grab new events and transform them. Assuming you set your underlying table up correctly using a sort key or equivalent on your warehouse of choice, this will be very fast. I have in practice seen models using this strategy take 20 seconds to process where a full refresh takes 1500 seconds, for a performance improvement of 98%+. YMMV, obviously, but my point is that this incrementality scenario is the best possible case if your underlying data and required transformation supports it.
Late-Arriving Facts
Extend the above scenario to late-arriving facts—records that arrive in our source data table out of order. This is a common occurrence with event data, especially when that event data is collected on a mobile device. When mobile devices cannot connect to the internet they typically save events that they can’t send in a queue, and then batch send them all later when they reconnect.
This scenario causes problems for our above approach, because the event_timestamp
will be in the past and therefore we will never pick these records up to be transformed. There are two ways to deal with this: close enough & performant or always correct & slow.
Close Enough & Performant
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
where event_timestamp >= (
select dateadd(day, -3, max(event_timestamp)::date) from {{this}}
)
{% endif %}
)
--rest of model...
This approach establishes a “window” of recent data to always re-transform every time the model runs. In the code above, that window is three days. Essentially, late-arriving facts have three days to arrive, after which point they are no longer eligible to be incorporated into our metrics (until, at least, we do a full-refresh).
This strategy is just what it seems: close enough. If you require perfect accuracy on every transformation, this isn’t a good strategy. But in many cases the performance-for-accuracy tradeoff makes sense given that we’re often talking about behavioral data that needs to be directionally correct, not 100% correct.
This strategy performs almost as well as the base case. It’s still an extremely limited table scan because of the where
condition built on top of the sort key (or equivalent). If you can get away with this in your scenario, I highly recommend it.
Always Correct & Slow
with events as (
select * from {{ref('events')}}
{% if is_incremental() %}
where session_id in (
select session_id from {{ref('table'}}
where event_timestamp >=
(select dateadd(day, -3, max(event_timestamp)::date) from {{this}})
)
{% endif %}
)
--rest of model...
Using this approach we’re grabbing all events associated with a given session if we’ve seen any events associated with that session since our most recent run. This will produce a completely accurate result every time the transformation is run because there’s no possibility of “missing” any events: if any event associated with a session arrives late—even if it’s a year late!—all of the events from that session will be selected and re-transformed.
However, it’s much slower. The innermost subquery is still quite fast, and pulling all session_id
s is quite fast, but grabbing all events from {{ref('events')}}
that have a particular session_id
is very slow because it’s not using a sort key, and, really, cannot. Whereas the earlier approach could result in a 95% performance increase vs. a full-refresh, this approach is often (in my experience) closer to only a 50%. It’s still better than doing a full-refresh, but it’s not fast. Use this approach sparingly.
Slowly-Changing Dimensions
If your transformation includes slowly changing dimensions, this will have meaningful implications for your incrementalization strategy. Here’s the scenario. Imagine you have an table called streams
that represents user’s streaming songs on Spotify. And one of the fields on that table is artist_name
. Well, from time-to-time, artists change their name (think Prince). When a name change for an artist happens, all prior streams look like they’re associated with a completely different artist, unless you explicitly handle this in your modeling. There are two ways to handle this: dimension tables and custom incrementality logic
.
Dimension tables
This is the best argument I’ve ever seen for creating star schemas instead of wide fact tables. If your fact table has an artist_id
instead of an artist_name
column, this entire problem goes away. You move some of the performance cost to analysis time because you’re doing the fact >> dimension join at analysis time, but as long as the cardinality of your dimensions is less than a million rows you’re generally going to be OK. On Redshift, make sure to set your dist style as all
for the dimensions.
Custom incrementality logic
with streams as (
select * from {{ref('events')}}
{% if is_incremental() %}
where event_timestamp >= (select max(event_timestamp)::date from {{this}})
or artist_name in (
--subquery that gets distinct artist names
--that have been updated since last run
)
{% endif %}
)
--rest of model...
This approach has the same performance drawbacks as the always correct & slow approach above: we’re no longer exclusively using a sort key (or equivalent) to grab new records and so the table scan will take much longer. The only reason you would generally prefer this approach over building dimension tables is if the cardinality of your dimension tables is high enough that the analysis-time join would take untenably long. This happens in some rare cases, but it’s typically rare.
Final Thoughts
Incrementality is hard. It’s easy to screw up, there are a surprising number of situations that arise requiring different design decisions, and it’s fairly hard to test. I’m hopeful that this download gives you a bit of a head start (or a sanity check) as you’re thinking through these issues on your own.
Also: I freely admit that I am not an expert on this topic—there are real data engineers out there who manage many-TB datasets who have far more experience than I do. As a result I’m sure that there’s plenty that I’m missing in this discussion. One of the patterns that I realize is common in the Airflow / Hive world that is not idiomatic to dbt is building tables explicitly partition-by-partition and running jobs explicitly for a given day. I do not fully understand if there are benefits to this approach that dbt does not take into account, or whether this approach is simply an artifact of those particular technologies and not necessarily desirable.
Would love any and all thoughts. Thanks for sticking with me