Incremental model use DB cluster column

Hi. We are writing an incremental script against a very large table on Snowflake. This table needs to be clustered by date for downstream consumption. The unique key for the merge is an ID. The merge statement performs poorly unless we can include a date range. Here is a simplified example of the manual SQL we want to emulate:

merge into tgt
using src
	on tgt.order_id = src.order_id
	and tgt.order_dt between '1992-01-01' and '1992-01-03'  -- THIS IS FILTER WE WANT TO ADD
when matched then update set tgt.order_status_cd = src.order_status_cd
when not matched then
	insert (order_id, order_status_cd, order_dt)
	values (src.order_id, src.order_status_cd, src.order_dt);

The incremental materialization doesn’t seem to support this. Is the only option a custom materialization? Any suggestions?

Thank you!

1 Like

Hi,
I have no idea on how to add such date logic to the generated dbt code - indeed, writing an custom materialization should be possible.
But,I would say in case of big tables you need to cluster the data for the right purpose, and if there are two purposes for the data set in mind, then split the data set in multiple tables per purpose. In this case I could imagine a following way of doing:
have a main table containing time invariant fields maintained with inserts only, clustered by your date field (for downstream consumption reason), then have a secondary table clustered by order-id where you maintain your order-updates. Downstream consuming should then of course make use of join logic - but then again the join logic makes use of the clustering key in the secondary table.

And, now that I am writing my first idea, I come to think of another one, that might work. I conclude/assume that your clustering key is currently the order_dt. What if you introduce a replacing unique key based on a combination of order_dt and order_id , like a concatenation of the two (date field first). Your filter key for downstream consuming will still be order_dt, but that field would be equally nicely clustered as the actual new cluster key.

Well I could not say which one would work better. The second one, I believe, comes the closest to your intended addition of a date filter logic.

As last comment, I would like to say that I personally favour a principle like ‘inserts-only’ DWH. So I would investigate solutions like separating time variant fields from the time invariant fields (like Data Vault does - all tables insert-only) and joining them during downstream consumption.

Regards,
Hugo.

Hi there,
We used BigQuery but I can’t see that the basic idea would change.
Our solution as to get a list of partition values to add to the MERGE statement.

  • We didn’t want to hardcode the values but rather just use what was necessary.
  • We needed the code to be dynamic, easily used by any incremental model, with a unique key.
  • We needed to know the partition field name and the source of the distinct values

This is our solution to the problem:

  • We added to custom options to the config block of our model
{{config  (
        partition_filter_field  = # This isn't mandatory, it can be derived from the model's config
        partition_filter_source = # This is mandatory for the code to be 'activated'. This can point to another model or to the current model i.e. {this}
) }}
  • We changed the changed the get_merge_sql macro added the lines of code below.
{# START - Determine the value list of Partition Field. #}
{# If Partition filter is provided in config block #}
    {%- set partition_filter_field = config.get('partition_filter_field') -%}
    {%- set partition_filter_source = config.get('partition_filter_source') -%}
    {% if partition_filter_source %}
        {# 1. Get Partition by information #}
        {% if partition_filter_field %}
            {%- set partition_by_field = partition_filter_field -%}
        {% else %}   
            {%- set raw_partition_by = config.get('partition_by', none) -%}
            {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
            {%- set partition_by_field = partition_by.field -%}
        {% endif %}

        {# 2. Get the view/table name to uses for Partition selection #}
        {% set partition_filter_source_ref %}
            {{ref(partition_filter_source)}}
        {% endset%}

        {# 3. Get values list for Partition field #}
        {% set value_list_sql %}
            select distinct {{partition_by_field}} FROM {{partition_filter_source_ref}}
        {% endset %}
        {%- set values = run_query(value_list_sql) -%}
        {%- set value_list = values.columns[0].values() -%}

        {# 4. Build WHERE clause #}
        {%- set partition_clause = [] -%}
        {%- for value in value_list -%}
            {%- do partition_clause.append('DBT_INTERNAL_DEST.'~ partition_by_field ~ ' = "' ~ value ~ '"') -%}
        {%- endfor -%} 

        {# 5. Add WHERE clause to predicates #}
        {% do predicates.append('( ' ~ partition_clause |join(' or ') ~ ' )') %} {# BigQuery requires ORs for multiple values in a MERGE statement #}
    {% else %}  
    {% endif %}

{# END - Determine the value list of Partition Field. #}
  • We ended up with a MERGE statement like this
        on ( DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-06" OR DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-07" ) and 
            DBT_INTERNAL_SOURCE.UNIQUE_KEY = DBT_INTERNAL_DEST.UNIQUE_KEY

Again this is the BigQuery version. You’ll need to look into the Snowflake version.
Hope this helps.

2 Likes

excellent. I think this is exactly what we’re looking for. Thank you!