I have an incremental model but I need a column that will track when each row was first added to the table (created_at) and a column that will track when any value from the row was last updated (updated_at). Is there a way to do this with incremental models?
If you are using the merge
incremental strategy, you could use the merge_excluded_columns
to leave the created_at column
untouched. Something like
{{
config(
materialized = 'incremental',
unique_key = 'id',
merge_exclude_columns = ['created_at'],
...
)
}}
select
id
, name
, current_date() as updated_at
, current_date() as created_at
from {{ ref('my_ref') }}
{% if is_incremental() %}
where <some condition>
{% endif %}
@brunoszdl thanks for the answer! Won’t this make every row updated on each run, even when there is no change in any other columns?
For example, if I have a daily run that updates the model with any changes found in the last 3 days - then adding “current_date() as updated_at” would make every row the “updated” row with a new “updated_at” value, even the ones where nothing has changed.
Maybe I wasn’t specific enough in my question - the “updated_at” needs to be updated with a new value only when any other column for that row has changed during the current run.
@brunoszdl I tried to implement this in dbt Core 1.6.5 in combination with Snowflake and it seems to ignore the parameter merge_exclude_columns. I cannot get it to work. Do you have a working example and what version & DB? Thanks!
@brunoszdl Related to @Dr_Chris 's question, I would like to use an equivalent pattern with the “append” strategy. I tried specifying the merge_exclude_columns
argument (and also tried append_exclude_columns
), but it did not affect the executed query. Is there a way to use that?
In particular, what I would like to happen is that the incremental load uses the unique_key
argument to identify matching entries, and then uses a subset of the remaining columns as value fields that determine if an entry has changed. The specific use case is that I would like to add a load_timestamp column (and possibly some other metadata) on each update, but do not want this to trigger an append for that key if the entry is otherwise identical.
We had to implement a custom materialization to do this.
In addition to merge_exclude_columns
, we added diff_exclude_columns
(because we never want to compare the updated_at
column, for example, since it will always be different). Otherwise, all the columns between the source & target are compared by default: and only if differences are detected will the row be updated (with the new updated_at
value).
@cbruun I finally solved my problem… First thing I learned is that merge_exclude_columns only acts on merge part not on the insert part. So to overcome this I use a view on the new data joining the existing data and keep certain fields as is, update others like changed timestamp and for new lines prefill certain fields e.g. created timestamp. Using a view on new and existing data might also be a solution for your requirement.
@mroy / @Dr_Chris I also needed to implement a custom incremental materialization to handle my use cases. It sounds like what I produced is similar to what @mroy created. I looked at the implementations of the current merge incremental and append incremental materializations, and they included parts that could be reused, but did not implement quite what I needed to make this work.
Hi,
I solved it like this:
{{ config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns'
)
}}
with
mymodel as (
select ...
)
{% if is_incremental() %}
,diff as (
select * from mymodel
except
select * exclude (_last_modified)
from {{ this }}
)
{% endif %}
select
*,
current_timestamp() as _last_modified
from mymodel
{% if is_incremental() %}
where exists (
select 1
from diff
where diff.id = cruise.id
)
{% endif %}
Hi @mroy
Any chance you could share the code for this custom materialization? Is it on github available or can share thru email or anything else?
We have the exact same requirement.
Thank you
It is a bit hard to share as it is deeply embedded into our customizations of dbt.
The materialization basically modifies the SQL of the model by wrapping the MERGE
around it, and then executing the result.
Here’s a simplified version:
{%- set params = model.meta.merge_query -%}
MERGE INTO {{ model.relation_name }} AS target
USING (
{{ sql }}
) AS source
ON (
{%- for col in params.join %}
{% if not loop.first %}AND {% endif %}source.{{ col }} = target.{{ col }}
{%- endfor %}
)
WHEN MATCHED AND (1 = 2
{%- for col in params.diff_cols %}
OR target.{{ col }} IS DISTINCT FROM source.{{ col }}
{%- endfor %}
) THEN UPDATE SET
{%- for col in params.update_cols %}
target.{{ col }} = source.{{ col }}
{%- if not loop.last %},{% endif %}
{%- endfor %}
WHEN NOT MATCHED
THEN INSERT (
{{ params.insert_cols | join(",\n ") }}
) VALUES (
source.{{ params.insert_cols | join(",\n source.") }}
);
Thank you very much!
I had a model I had built with some CTEs and no natural updated_at
or created_at
to so this is how I solved it. I just have the incremental model update the fields that change and only update the updated_at
if it changes. Some example code:
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns',
merge_update_columns = ['changing_field_a', 'changing_field_b','updated_at'],
)
}}
with cte1 as ()...
, cte2 as ()...
, result as ()...
select
r.id
, r.unchanging_field_1
, r.changing_field_a
, r.changing_field_b
, current_timestamp() as created_at
, case
{% if is_incremental() %}
when r.changing_field_a != t.changing_field_a or
r.changing_field_b != t.changing_field_b
then current_timestamp()
{% endif %}
when created_at is not null then created_at
end as updated_at
from results as r
{% if is_incremental() %}
left join {{ this }} as t on r.id = t.id
{% endif %}
Is this too hacky or a reasonable pattern? I know that my created_at
and updated_at
will all get updated with a full refresh but for my downstream purposes that is okay. Feedback welcome!