speedup merge strategy with custom incremental predicate

Hi everyone,

I would like to share the solution to speedup the merge operation, highly inspired by this thread. The solution is to customize the incremental_predicates when using incremental_strategy='merge'. The idea is to pre-filter the partitions of the destination table before merging based dynamically on the values of the update. In my case, I noticed a 30% speedup when merging 100k rows into 1.4B rows (this number depends on the actual distribution of your data).

Now the nitty gritty details:

1/ in it’s native form with v1.4, you can add the option incremental_predicates to filter the destination table in the config block. However, you can only add a static value. In other words, the filter cannot depend dynamically on the actual values of your update. Subqueries are not allowed, nor using variables, nor any jinja templating. In other words, the SQL command is not evaluated before runtime.

2/ to customize the behavior of incremental_predicates, you need to define the get_merge_sql macro which reads the values in your update, then add a predicate filter on the partition keys. The latter point is important since it will force dbt to skip the partitions. Otherwise, it will still scan the whole table.

caveat: I have not found a solution yet to make the macro adaptive to any situation. At the moment, I have defined an if statement each time I want to use a predicate for a different model.

Please find below a reproducible code. Let’s assume that the source mysource.yml contains the following:

- name: mytable
  columns:
    - name: id #primary key on which to perform the merge operation.
    - name: day #the partition key
    - name: myvalue #some data
    - name: _processing_time #how to filter between incremental updates

The model mymodel.sql using the custom incremental_predicates:

{{ 
    config (
        materialized="incremental",
        incremental_strategy='merge',
        file_format='delta',
        unique_key='id',
        partition_by=["day"],
        incremental_predicates=["custom_day"]
    )
}}

SELECT
    id, 
    day, 
    myvalue, 
   _processing_time
FROM {{ source('mysource', 'mytable') }} AS s
{% if is_incremental() %}
    WHERE s._processing_time > (select MAX(_processing_time) from {{ this }})
{% endif %}

The get_merge_sql macro:

{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
    {% set predicate_override = "" %}
      {% if incremental_predicates[0] == "custom_day" %}
        {% set get_limits_query %}
            SELECT
                MIN(day) AS min_day,
                MAX(day) AS max_day
            FROM {{ source }}
        {% endset %}
        {% set limits = run_query(get_limits_query)[0] %}
        {% set min_day, max_day = limits[0], limits[1] %}

        {% set predicate_override %}
            DBT_INTERNAL_DEST.day >= '{{ min_day }}'
            AND DBT_INTERNAL_DEST.day <= '{{ max_day }}'
        {% endset %}
    {% endif %}
    {% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
    -- standard merge from here
    {% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
    {{ return(merge_sql) }}

{% endmacro %}

Here are the steps to reproduce the behavior:
1/ create mytable with the SQL script (databricks):

CREATE TABLE mytable (id int, day date, myvalue int, _processing_time timestamp);

INSERT INTO mytable VALUES (1, '2023-01-01', 10, current_timestamp());
INSERT INTO mytable VALUES (2, '2023-01-01', 20, current_timestamp());
INSERT INTO mytable VALUES (3, '2023-01-02', 30, current_timestamp());
INSERT INTO mytable VALUES (4, '2023-01-03', 40, current_timestamp());

2/ $ dbt run --full-refresh --select mymodel
3/ add a new row to the source table:

INSERT INTO mytable VALUES (5, '2023-01-04', 50, current_timestamp());

4/ simulate the upsert with $ dbt run --select mymodel. If you have used a SQL warehouse, you should get in the query profile ‘files pruned = 3’, ie the merge should have skipped the existing partitions.

4 Likes

Yees ! Thanks a lot, that is exactly what I need for my project :star_struck:

Thank you for this write-up! I really enjoy seeing how other practitioners implement creative solutions.

We are doing something similar in our project, but instead of inserting a statement into the get_merge_sql macro, we are able to achieve the same outcome using incremental_predicates = ["DBT_INTERNAL_DEST.date >= current_date - 14" ]. The downside is that it’s not as flexible with the fixed lookback window.

1 Like