Hi. We are writing an incremental script against a very large table on Snowflake. This table needs to be clustered by date for downstream consumption. The unique key for the merge is an ID. The merge statement performs poorly unless we can include a date range. Here is a simplified example of the manual SQL we want to emulate:
merge into tgt
using src
on tgt.order_id = src.order_id
and tgt.order_dt between '1992-01-01' and '1992-01-03' -- THIS IS FILTER WE WANT TO ADD
when matched then update set tgt.order_status_cd = src.order_status_cd
when not matched then
insert (order_id, order_status_cd, order_dt)
values (src.order_id, src.order_status_cd, src.order_dt);
The incremental materialization doesnāt seem to support this. Is the only option a custom materialization? Any suggestions?
Hi,
I have no idea on how to add such date logic to the generated dbt code - indeed, writing an custom materialization should be possible.
But,I would say in case of big tables you need to cluster the data for the right purpose, and if there are two purposes for the data set in mind, then split the data set in multiple tables per purpose. In this case I could imagine a following way of doing:
have a main table containing time invariant fields maintained with inserts only, clustered by your date field (for downstream consumption reason), then have a secondary table clustered by order-id where you maintain your order-updates. Downstream consuming should then of course make use of join logic - but then again the join logic makes use of the clustering key in the secondary table.
And, now that I am writing my first idea, I come to think of another one, that might work. I conclude/assume that your clustering key is currently the order_dt. What if you introduce a replacing unique key based on a combination of order_dt and order_id , like a concatenation of the two (date field first). Your filter key for downstream consuming will still be order_dt, but that field would be equally nicely clustered as the actual new cluster key.
Well I could not say which one would work better. The second one, I believe, comes the closest to your intended addition of a date filter logic.
As last comment, I would like to say that I personally favour a principle like āinserts-onlyā DWH. So I would investigate solutions like separating time variant fields from the time invariant fields (like Data Vault does - all tables insert-only) and joining them during downstream consumption.
Hi there,
We used BigQuery but I canāt see that the basic idea would change.
Our solution as to get a list of partition values to add to the MERGE statement.
We didnāt want to hardcode the values but rather just use what was necessary.
We needed the code to be dynamic, easily used by any incremental model, with a unique key.
We needed to know the partition field name and the source of the distinct values
This is our solution to the problem:
We added to custom options to the config block of our model
{{config (
partition_filter_field = # This isn't mandatory, it can be derived from the model's config
partition_filter_source = # This is mandatory for the code to be 'activated'. This can point to another model or to the current model i.e. {this}
) }}
We changed the changed the get_merge_sql macro added the lines of code below.
{# START - Determine the value list of Partition Field. #}
{# If Partition filter is provided in config block #}
{%- set partition_filter_field = config.get('partition_filter_field') -%}
{%- set partition_filter_source = config.get('partition_filter_source') -%}
{% if partition_filter_source %}
{# 1. Get Partition by information #}
{% if partition_filter_field %}
{%- set partition_by_field = partition_filter_field -%}
{% else %}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partition_by_field = partition_by.field -%}
{% endif %}
{# 2. Get the view/table name to uses for Partition selection #}
{% set partition_filter_source_ref %}
{{ref(partition_filter_source)}}
{% endset%}
{# 3. Get values list for Partition field #}
{% set value_list_sql %}
select distinct {{partition_by_field}} FROM {{partition_filter_source_ref}}
{% endset %}
{%- set values = run_query(value_list_sql) -%}
{%- set value_list = values.columns[0].values() -%}
{# 4. Build WHERE clause #}
{%- set partition_clause = [] -%}
{%- for value in value_list -%}
{%- do partition_clause.append('DBT_INTERNAL_DEST.'~ partition_by_field ~ ' = "' ~ value ~ '"') -%}
{%- endfor -%}
{# 5. Add WHERE clause to predicates #}
{% do predicates.append('( ' ~ partition_clause |join(' or ') ~ ' )') %} {# BigQuery requires ORs for multiple values in a MERGE statement #}
{% else %}
{% endif %}
{# END - Determine the value list of Partition Field. #}
We ended up with a MERGE statement like this
on ( DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-06" OR DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-07" ) and
DBT_INTERNAL_SOURCE.UNIQUE_KEY = DBT_INTERNAL_DEST.UNIQUE_KEY
Again this is the BigQuery version. Youāll need to look into the Snowflake version.
Hope this helps.
Hello @meurant.naude ,
Iām glad to come across your post! Iām trying to implement exactly what you showed, but Iām still very new to dbt. You wrote that you added the code you listed in your post. Is it possible to share your entire āget_merge_sqlā? I found the source file, but Iām trying to piece together how your code and this code combine, so Iām hopeful you can share your entire code, or just simply share your actual file (even better).
Itās a pity this functionality isnāt baked-in, especially since itās the only way to improve performance in dbt-GBQ merge, aside from insert_overwrite. I see in the default code there is a predicates variable, but itās not useful for this purpose, it seems.