Error: dbt incremental model on BQ Partitioned table using string field

The problem I’m having

Unable to replace records in pub_table with records from int_table based on a string column

Database Error in model pub_table(models/project/dataset/published/pub_table.sql)
Query error: Function not found: string_trunc

The context of why I’m trying to do this

For every Airflow DAG run, I would like to replace the records in the pub_table with the records from the int_table if there is a match with the field _meta_source_file.

For example,
If the pub_table has 100 records and int_table has 120 records for _meta_source_file = “abc”, then during the DAG run, the pub_table 100 records should be replaced with int_table 120 records.

Basically, delete 100 records from pub_table and insert 120 records from the int_table.

What I’ve already tried

I understand Big Query only allows int, date and timestamp fields in the partition by. Is there any other way to address my requirement.

Some example code or error messages

pub_table script -

{{
    config(
        materialized="incremental",
        incremental_strategy='insert_overwrite',
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        partition_by={"field": "_meta_source_file", "data_type": "STRING"},
        cluster_by=["_meta_use_case","transaction_date"],
    )
}}

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}

select *
    from {{ ref("int_table") }} as int_model
    {% if is_incremental() %}
        where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
    {% else %}  
        where int_model._meta_inserted_at is not null
    {% endif %}

Hello Again …
So for this problem the quickest fix would be to introduce another column _meta_source_file_hash in your model and use the BigQuery FARM_FINGERPRINT function to give it the hashed value of the _meta_source_file column.
Since the new column _meta_source_file_hash will be of type int64, you should now be able to easily partition the table on it. Just make sure to include it in your where clauses to ensure partition pruning takes effect.

Hi :slight_smile:

I’m sorry. I didn’t get this part, “Just make sure to include it in your where clauses to ensure partition pruning takes effect.” Can you modify the pub_table script to explain the above line.

Thanks.

Ok so I had a look at the context of your problem (my old post literally just answered the title - how to partition on a String column).

Now I don’t think you really want the insert_overwrite option.
Consider the following case:-

Imagine you use the strategy I mentioned in the previous post and manage to partition based on the _meta_source_file_hash.
Now in BigQuery for integer partitions we assign a range of values for each partition.

So its totally possible the hashes say of two files abc and def map to the same partition.
Next in your dbt run if you get data only for file abc, all the data in partition which where data from files abc and def were residing will be cleared and re-populated with new which only has rows for abc.

Note this is assuming your int_table file gets cleared regularly and doesn’t at all times contain all the data saved to pub.

Honestly I am unsure how to achieve this using just one model (replacing some records but not based on unique key) :frowning: . The only way that comes to mind is to use a dbt pre-hook to delete the data linked to source files present in the int_table and then let the pub_model ingest all the data coming from int_table. (note there I think should be better ways of achieving this)


On your previous question how to add the hash:-

select *, farm_fingerprint(_meta_source_file) as _meta_source_file_hash
    from {{ ref("int_table") }} as int_model
    {% if is_incremental() %}
        where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
    {% else %}  
        where int_model._meta_inserted_at is not null
    {% endif %}
1 Like

Hi Stephen, you are being very helpful. Thanks a lot for that first. I developed on your idea - Use prehook to delete the records and the insert new records using incremental strategy.

This is what I came up with, and the macro is not working as intended, I don’t know where I’m going wrong. Can you you help me on this one?

Failing macro (checks table’s presence and truncates the table if present) -

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% set table_exists_query %}
    SELECT COUNT(*) 
    FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
    WHERE table_id = '{{ table_name.name }}' -- returns 1 for pub_table
    {% endset %}
    
    {% set table_exists = dbt_utils.get_single_value(table_exists_query) %}
    
    {% if table_exists | int > 0 %}
        TRUNCATE TABLE {{ table_name }}
        -- DELETE FROM {{ table_name }}
        -- WHERE _meta_source_file IS NOT NULL
        -- IN (
        --     SELECT DISTINCT _meta_source_file
        --     FROM {{ ref_table_name }}
        --     WHERE _meta_inserted_at = SAFE_CAST({{ max_meta_inserted_at }} AS timestamp)
        -- )
    {% endif %}
{% endmacro %}

simple macro - this works(truncates the table before incremental insertion):

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    TRUNCATE TABLE {{ table_name }}
{% endmacro %}

pub_table:

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
        pre_hook=delete_existing_records(
            this, # pub_table
            ref("int_table"),
            max_meta_inserted_at
        )
    )
}}

select *
from {{ ref("int_table") }} as int_model
{% if is_incremental() %}
    where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
    # where 1<>1
{% else %}  
  where int_model._meta_inserted_at is not null
{% endif %}

I raised another issue for the Prehook - macro one. Adding it just for your reference and help.

I used the pre_hook and the macro as you @stephen suggested and it worked like a charm, thanks. Below is my script,

pub_table -

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_model"), airflow_info) %}

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
        pre_hook=delete_existing_records(
            this,
            ref("int_model"),
            max_meta_inserted_at
        )
    )
}}

select *
    from {{ ref("int_model") }} as int_model
    {% if is_incremental() %}
        where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
        -- where 1<>1
    {% else %}  
        where int_model._meta_inserted_at is not null
    {% endif %}

delete_existing_records macro -

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% if execute %}
        {% set table_exists_query %}
            SELECT COUNT(*) 
            FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
            WHERE table_id = '{{ table_name.name }}'
        {% endset %}
            
        {% set table_exists = dbt_utils.get_single_value(table_exists_query) | int %}
        
        {{ log("Table exists value: " ~ table_exists, info=True) }}
        
        {% if table_exists > 0 %}
            {{ log("Deleting records from table: " ~ table_name, info=True) }}

            {%- call statement('delete_records', fetch_result=True) -%}
                DELETE FROM {{ table_name }} 
                WHERE <some_condition>;
            {%- endcall -%}

            {%- set delete_result = load_result('delete_records') -%}
            {{ log(delete_result['response'].rows_affected ~ ' rows deleted', info=True) }}

            -- Add this to verify the execution continues
            {{ log("Successfully executed DELETE on: " ~ table_name, info=True) }}
        {% else %}
            {{ log("Table does not exist: " ~ table_name, info=True) }}
        {% endif %}
    {% endif %}
{% endmacro %} 

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.