Pivoting data is a very common task for reporting models in dbt. So much so that one of the oldest and most useful utility macros is for pivoiting. In my experience though, pivoting doesn’t play very nice with one of one of dbt’s most powerful features - incremental materializations.
If you are a heavy user of incremental models, then the following sql will look familiar to you:
select
event_at as event_at,
count(distinct user_id) as daily_active_users
from raw_app_data.events
{% if is_incremental() %}
where event_at > (select max(event_at) from {{ this }})
{% endif %}
group by 1
This might be so common in your projects that you might have abstracted away the incremental logic with the help of handy macros. And this simple approach will work for 80% of all the models you write. Pivots however fall into that other 20%.
Take for example this pivot model which is to show a list of responses for an application:
SELECT new_rows.*
FROM (
SELECT form_attr.application_id AS application_id,
{{ pageup_dbt_utils.mapped_pivot(
column='form_attr.attribute_key',
mapping_table=ref('ref_application_form_fields'),
mapping_key='map_key',
mapping_value='mapped_value_name',
agg='MAX',
then_value='form_attr.attribute_val_text',
else_value='NULL'
) }},
MAX(form_attr.model_timestamp) AS aggregated_model_timestamp
FROM {{ ref('work_application_form_attribute') }} AS form_attr
-- other joins
GROUP BY xx.application_id
) new_rows
{% if is_incremental() %}
WHERE form_attr.model_timestamp > (select max(model_timestamp) from {{ this }})
{% endif %}
{{
config({
"materialization": "incremental",
"unique_key": "application_id"
})
}}
First, lets look at mapped_pivot. This is very similar to the dbt_utils pivot macro, but uses another table to get a list of columns. ref_application_form_fields
looks like this:
map_key | mapped_value_name |
---|---|
1 | APPLICATION001 |
2 | APPLICATION002 |
3 | APPLICATION003 |
etc… |
The end result is a model with around 100 columns and ~6 million rows. Meaning that there are about 600 million rows of source data (with some blanks).
For a full refresh/initial load this model is about as optimal as it can be. But for and incremental run there are some noticable performance issues. In fact, for my data set this model takes almost as long to run for an incremental load as it does for a full load (about 800 seconds). So there is definitely some room for improvement.
A solution
What we want to do to improve performance is to be smarter about filtering out rows before they are pivoted. This gets hard though as if 1 attribute row is updated, you actually need every row for that application included to complete the pivot.
Or do you?
What if we could pivot only the changed rows, then merge the updated values to get the complete model? This is what I will show below.
First, lets move our incremental filter to where it is most effective.
...
FROM {{ ref('work_application_form_attribute') }} AS form_attr
-- other joins
{%- if is_incremental() %}
WHERE form_attr.model_timestamp > (select max(model_timestamp) from {{ this }})
{%- endif %}
GROUP BY xx.application_id
...
Now incremental runs will filter to only changed values, resulting in partial pivot rows. Next we need to make some changes to the pivot, as to make use of this partial set of data we will need some additional logic.
...
SELECT form_attr.application_id,
{{ pageup_dbt_utils.mapped_pivot(
column='form_attr.attribute_key',
mapping_table=ref('ref_application_form_fields'),
mapping_key='map_key',
mapping_value='mapped_value_name',
agg='MAX',
then_value='form_attr.attribute_val_text',
else_value='NULL',
add_update_flag_on_incremental=true,
update_flag_suffix='__is_updated'
) }},
MAX(form_attr.model_timestamp) AS aggregated_model_timestamp
...
There are now a few extra arguments passed into mapped_pivot
. You can view the documentation and implementation here. But what is going on is as follows:
- add_update_flag_on_incremental has made the pivot include a new column for every pivoted column. This column is a flag to indicate if this value has been updated.
- The column will have the suffix
__is_updated
. The list of columns created will now look like:APPLICATION001
APPLICATION001__is_updated
APPLICATION002
-
APPLICATION002__is_updated
…
So the pivoted rows are now prepped, next step is to merge them. The easiest way to do this is to link back to the existing model.
...
GROUP BY xx.application_id
) new_rows
{%- if is_incremental() %}
LEFT JOIN {{ this }} AS old_rows on old_rows.application_id = new_rows.application_id
{% endif -%}
...
We have everything we need to build a complete pivoted row now. And we can write a macro to make the merge easy.
SELECT new_rows.application_id,
{{ pageup_dbt_utils.merge_updated_values(
new_table='new_rows',
old_table='old_rows',
unique_key='application_id',
merge_column_names=dbt_utils.get_column_values(
table=ref('ref_application_form_answers'),
column='mapped_value_name'),
update_flag_suffix='__is_updated',
else_value='NULL'
) }},
new_rows.aggregated_model_timestamp
...
You can view the documentation and implementation here for this macro. Basically, it is talking a list of column names, checking the __is_updated
column and picking the column from new_rows
if it is updated, otherwise taking it from old_rows
if it is unchanged. If the pivoted row is completly new, there is a default value to use.
Another key part of what this macro does is that it only does work during an incremental run. Otherwise it takes only the columns from new_rows
.
The final completed model looks as below:
SELECT new_rows.application_id,
{{ pageup_dbt_utils.merge_updated_values(
new_table='new_rows',
old_table='old_rows',
unique_key='application_id',
merge_column_names=dbt_utils.get_column_values(
table=ref('ref_application_form_answers'),
column='mapped_value_name'),column='mapped_value_name'),
update_flag_suffix='__is_updated',
else_value='NULL'
) }},
new_rows.aggregated_model_timestamp
FROM (
SELECT form_attr.application_id,
{{ pageup_dbt_utils.mapped_pivot(
column='form_attr.attribute_key',
mapping_table=ref('ref_application_form_fields'),
mapping_key='map_key',
mapping_value='mapped_value_name',
agg='MAX',
then_value='form_attr.attribute_val_text',
else_value='NULL',
add_update_flag_on_incremental=true,
update_flag_suffix='__is_updated'
) }},
MAX(form_attr.model_timestamp) AS aggregated_model_timestamp
FROM {{ ref('work_application_form_attribute') }} AS form_attr
-- other joins
{%- if is_incremental() %}
WHERE form_attr.model_timestamp > (select max(model_timestamp) from {{ this }})
{%- endif %}
GROUP BY xx.application_id
) new_rows
{%- if is_incremental() %}
LEFT JOIN {{ this }} AS old_rows on old_rows.application_id = new_rows.application_id
{% endif -%}
{{
config({
"materialization": "incremental",
"unique_key": "application_id"
})
}}
And no post on performance would be complete without a pretty chart:
~800seconds down to <50 seconds. Lovely
Caveats
This is not a perfect solution for all situations, it does make a lot of assumptions:
- This will not work if your source data contains more than 1 value for a given pivot key (eg if you are pivoting on count).
- Some database engines/models simply wont get any benefit from this. Either the system is already clever enough to optimize or there are other blockers to performance. So your mileage may vary.
- Does not work if you have a dynamic list of columns to pivot (but then incremental materialization hates this already).
- Efficiency aside, this code is nowhere near as easy to understand as the pivot we started with. This makes it harder to come back to fix, and for new users to understand what is going on.
Hopefully someone finds this useful.