Incremental model use DB cluster column

Hi there,
We used BigQuery but I can’t see that the basic idea would change.
Our solution as to get a list of partition values to add to the MERGE statement.

  • We didn’t want to hardcode the values but rather just use what was necessary.
  • We needed the code to be dynamic, easily used by any incremental model, with a unique key.
  • We needed to know the partition field name and the source of the distinct values

This is our solution to the problem:

  • We added to custom options to the config block of our model
{{config  (
        partition_filter_field  = # This isn't mandatory, it can be derived from the model's config
        partition_filter_source = # This is mandatory for the code to be 'activated'. This can point to another model or to the current model i.e. {this}
) }}
  • We changed the changed the get_merge_sql macro added the lines of code below.
{# START - Determine the value list of Partition Field. #}
{# If Partition filter is provided in config block #}
    {%- set partition_filter_field = config.get('partition_filter_field') -%}
    {%- set partition_filter_source = config.get('partition_filter_source') -%}
    {% if partition_filter_source %}
        {# 1. Get Partition by information #}
        {% if partition_filter_field %}
            {%- set partition_by_field = partition_filter_field -%}
        {% else %}   
            {%- set raw_partition_by = config.get('partition_by', none) -%}
            {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
            {%- set partition_by_field = partition_by.field -%}
        {% endif %}

        {# 2. Get the view/table name to uses for Partition selection #}
        {% set partition_filter_source_ref %}
            {{ref(partition_filter_source)}}
        {% endset%}

        {# 3. Get values list for Partition field #}
        {% set value_list_sql %}
            select distinct {{partition_by_field}} FROM {{partition_filter_source_ref}}
        {% endset %}
        {%- set values = run_query(value_list_sql) -%}
        {%- set value_list = values.columns[0].values() -%}

        {# 4. Build WHERE clause #}
        {%- set partition_clause = [] -%}
        {%- for value in value_list -%}
            {%- do partition_clause.append('DBT_INTERNAL_DEST.'~ partition_by_field ~ ' = "' ~ value ~ '"') -%}
        {%- endfor -%} 

        {# 5. Add WHERE clause to predicates #}
        {% do predicates.append('( ' ~ partition_clause |join(' or ') ~ ' )') %} {# BigQuery requires ORs for multiple values in a MERGE statement #}
    {% else %}  
    {% endif %}

{# END - Determine the value list of Partition Field. #}
  • We ended up with a MERGE statement like this
        on ( DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-06" OR DBT_INTERNAL_DEST.PARTITION_DATE = "2021-01-07" ) and 
            DBT_INTERNAL_SOURCE.UNIQUE_KEY = DBT_INTERNAL_DEST.UNIQUE_KEY

Again this is the BigQuery version. You’ll need to look into the Snowflake version.
Hope this helps.

4 Likes