Implementing a Persistent Staging Area with a dynamic incremental model

Hi @feriksen,

Yes, the model reads the source table PK in the metadata, and create the staging table PK as ‘source PK columns + load_dts’.

IMHO the better is to exchange on the PSA solution once rewritten in a materialization form.
But below, you can find the macros related to the hash_diff and hash_key functions.

They follow the DV 2.0 recommandations :

  • ‘;’ as separator
  • each value is trimed, upper-cased and casted as text
  • null is changed in empty string
  • in order to correctly detect deltas when source table columns are added, only the columns’ values up to the last non null value are taken into account (a;1;null;b;null;null yields a;1;null;b)
  • not shown here, but dropped source columns are kept in the model’s table, and replaced by null in the select column list.

Last, due to a PostgreSQL limitation (no more than 100 arguments in a function call), I had to batch coalesce or concat function calls.

Looking forward,

Fabrice Etanchaud

{%- macro batch_function_call(function_name, params, batch_size) -%}
    {{ function_name }}(
        {%- if params | length <= batch_size -%}
            {{ params | join(', ') }}
        {%- else -%}
            {%- for params_batch in params | batch(batch_size) -%}
                {{ function_name }}(
                    {{ params_batch | join(', ') }}
                )
                {%- if not loop.last -%}, {%- endif -%}
            {%- endfor -%}
        {%- endif -%}
    )
{%- endmacro -%}

{%- macro hash_key(colnames) -%}
    md5(
    {%- if colnames | length > 0 -%}
        {%- set strings = [] -%}
        {%- for colname in colnames %}
            {%- do strings.append("upper(trim(cast(" ~ colname ~ " as text)))") -%}
            {%- if not loop.last -%}
                {%- do strings.append(";") -%}
            {%- endif -%}
        {%- endfor -%}
        {{ batch_function_call('concat', strings, 100) }}
    {%- else -%}
        ''
    {%- endif -%}
    )::uuid
{%- endmacro -%}

{%- macro hash_diff(colnames) -%}
    md5(
    {%- if colnames | length > 0 -%}
        {%- set strings = [] -%}
        {%- for colname in colnames %}
            {%- do strings.append("upper(trim(cast(" ~ colname ~ " as text)))") -%}
            {%- if not loop.last -%}
                {%- set next_strings = [] -%}
                {%- for next_string in colnames[loop.index:] -%}
                    {%- do next_strings.append("cast(" ~ next_string ~ " as text)") -%}
                {%- endfor -%}
                {%- do strings.append("case when " ~ batch_function_call('coalesce', next_strings, 100) ~ " is not null then ';' end") -%}
            {%- endif -%}
        {%- endfor -%}
        {{ batch_function_call('concat', strings, 100) }}
    {%- else -%}
        ''
    {%- endif -%}
    )::uuid
{%- endmacro %}
1 Like