Hello dbt Community,
I’m working on optimizing our data warehouse updates in Snowflake by implementing an incremental model that leverages unions and conditional checks against a reference model to only process new or updated records. Despite following the dbt documentation and examples, I’m encountering challenges in correctly filtering data during the incremental load process.
The problem I’m having: My goal is to efficiently update the data warehouse by applying an incremental model that unifies data from multiple sources based on their last update timestamp. However, I’m unsure if my implementation of conditional checks within the union
operation is correctly identifying and processing only the new or updated records.
The context of why I’m trying to do this: Due to the increasing volume of data, full refreshes are no longer practical. By applying an incremental model that can intelligently update only changed records, I aim to reduce processing time and resource consumption, thereby increasing efficiency and managing costs more effectively.
The problem it seem to not work as excepted: Getting the data from source with is incremental model seems to not work in the source
What I’ve already tried: I’ve structured my dbt model to first reference individual source tables, applying a conditional check to filter records based on their last update timestamp. These filtered sources are then unified and further processed
– Defining source combinations with conditional data checks
with source_combination as (
select * from {{ ref(‘flatten_data_combination’) }} as c
{% if is_incremental() %}
where c.last_time_updated > (select max(last_time_updated) from {{ this }})
{% endif %}
),
source_label as (
select * from {{ ref(‘flatten_data_label’) }} as l
{% if is_incremental() %}
where l.last_time_updated > (select max(last_time_updated) from {{ this }})
{% endif %}
),
– Preparing union of sources
prep_union_an as (
select
‘val1’::string as col1,
image_md5,
last_time_updated
from source_combination
union
select
‘val2’::string as col1,
coords,
image_md5,
last_time_updated
from source_label
),
mart_union as (
select
UUID_STRING() as id,
coords,
image_md5,
last_time_updated
from prep_union_an a
{% if is_incremental() %}
where a.last_time_updated > (select max(last_time_updated) from {{ this }})
{% endif %}
)
select * from mart_union