Handling merge statements in incremental models

I have a table that I need to update incrementally in snowflake. Today I use the following query:

MERGE INTO incremental_table t 
    using (
      with co as (
          select distinct * from (
            select
               COL_A,
               COL_B,
               COL_C,
               COL_D,
               COL_E,
               COL_F
            from
                source_table 
            where
                COL_A is not null
         )      
      )
      select
        *
      from 
        co
    ) s
    ON 
        s.COL_A = t.COL_A
        s.COL_B = t.COL_B
        s.COL_C = t.COL_C
    WHEN MATCHED AND s.COL_D < t.COL_D THEN
        UPDATE SET
            t.COL_D = s.COL_D
    WHEN MATCHED AND s.COL_E is not null AND t.COL_E is null THEN
        UPDATE SET
            t.COL_E = s.COL_E
    WHEN MATCHED AND s.COL_F is not null AND t.COL_F is null THEN
        UPDATE SET
            t.COL_F = s.COL_F
    WHEN NOT MATCHED THEN
        INSERT (
          col_a,
          col_b, 
          col_c,
          col_d, 
          col_e,
          col_f
        ) VALUES (
          s.col_a,
          s.col_b, 
          s.col_c,
          s.col_d, 
          s.col_e,
          s.col_f
        )  
;

I trying to figure out how to best model that in dbt? As you can see different columns get updated depending on the various (non unique key fields). Is there a way to add this statement to dbt and get it to increment the model, or do I need to break it up into a set of selects, one for each “WHEN MATCHED” and “WHEN NOT MATCHED” statement?

Any help or docs explaining that would be useful or how have others solved this?

Thanks in advance

1 Like

Hey @axdahl - we tend to avoid merge statements like this for the same reason that we avoid writing insert/update statements on other databases: this type of DDL mixes data modeling logic with table-building logic. To be sure, I think it’s totally reasonable to run a query like this, I just want to give some insight into why this exact merge statement isn’t directly representable in dbt.

It’s a little tricky for me to wrap my head around the particular logic you’re outlining here. I think you’ll need to use an incremental model, possibly joining back to the destination table.

I’d try to frame it like this: write the select statement which generates the records you want to upsert into your destination table. If it turns out that the field you want to upsert on is non-unique, you can set dbt’s incremental_strategy to do a delete+insert instead of a merge.

Last thing: I’d recommend generating a surrogate key for the field that you’re joining on (eg. md5(col_a || col_b || col_c) as it should make your join logic a whole lot cleaner here. More info on that here and here.

Here’s a quick example that should hopefully get you started! Let me know how it goes :slight_smile:

with source_data as (

	select
		'abc' as id,
		1 as col_d,
		2 as col_e,
		3 as col_f

),

destination_table as (

	-- use {{ this }} to select from the destination table
	-- wrap it in {% if is_incremental() %} to make it work
	--   in full-refresh builds

	select
		'abc' as id,
		10 as col_d,
		null::int as col_e,
		12 as col_f

)

select
	greatest(s.col_d, t.col_d) as col_d,
	coalesce(t.col_e, s.col_e) as col_e,
	coalesce(t.col_f, s.col_f) as col_f

from source_data as s
join destination_table as t using (id)
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.