Pre hook - Macro - Delete statement

The problem I’m having

The pre_hook fires the macro but the macro isn’t deleting the records from the pub_table

The context of why I’m trying to do this

For every Airflow DAG run, I would like to replace the records in the pub_table with the records from the int_table if there is a match with the field _meta_source_file.

For example,
If the pub_table has 100 records and int_table has 120 records for _meta_source_file = “abc”, then during the DAG run, the pub_table 100 records should be replaced with int_table 120 records.

Basically, delete 100 records from pub_table and insert 120 records from the int_table.

What I’ve already tried

Added a pre_hook to the pub_table config to trigger the macro delete_existing_records - deletes records with matching _meta_source_file from int_table in pub_table.

Some example code or error messages

Failing macro (checks table’s presence and truncates the table if present) -

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% set table_exists_query %}
    SELECT COUNT(*) 
    FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
    WHERE table_id = '{{ table_name.name }}' -- returns 1 for pub_table
    {% endset %}
    
    {% set table_exists = dbt_utils.get_single_value(table_exists_query) %}
    
    {% if table_exists | int > 0 %}
        TRUNCATE TABLE {{ table_name }}
        -- DELETE FROM {{ table_name }}
        -- WHERE _meta_source_file IS NOT NULL
        -- IN (
        --     SELECT DISTINCT _meta_source_file
        --     FROM {{ ref_table_name }}
        --     WHERE _meta_inserted_at = SAFE_CAST({{ max_meta_inserted_at }} AS timestamp)
        -- )
    {% endif %}
{% endmacro %}

simple macro - this works(truncates the table before incremental insertion):

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    TRUNCATE TABLE {{ table_name }}
{% endmacro %}

pub_table:

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
        pre_hook=delete_existing_records(
            this, # pub_table
            ref("int_table"),
            max_meta_inserted_at
        )
    )
}}

select *
from {{ ref("int_table") }} as int_model
{% if is_incremental() %}
    where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
    # where 1<>1
{% else %}  
  where int_model._meta_inserted_at is not null
{% endif %}

I feel the if condition is failing.
Could you print the table_exists value. Using {{ log('table_exists ’ ~table_exists, info= true) }}. Please use this before the if condition.

I added the logging,

Macro,

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% if execute %}
        {% set table_exists_query %}
            SELECT COUNT(*) 
            FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
            WHERE table_id = '{{ table_name.name }}'
        {% endset %}
            
        {% set table_exists = dbt_utils.get_single_value(table_exists_query) | int %}
        
        {{ log("Table exists value: " ~ table_exists, info=True) }}
        
        {% if table_exists > 0 %}
            {{ log("Truncating table: " ~ table_name, info=True) }}
            TRUNCATE TABLE {{ table_name }}
        {% else %}
            {{ log("Table does not exist: " ~ table_name, info=True) }}
        {% endif %}
    {% endif %}
{% endmacro %}

From Logs,

Table exists value: 1
Truncating table: `project`.`dataset`.`pub_table`

But the table did not get truncated. I don’t know where I’m going wrong.

Prehook truncates the pub_table table before building it and when u run the pub_table model it again inserts data into the pub_tableright?

Yes, before the pub_table model runs, the prehook truncates the pub_table and the pub_table model inserts records from int_table. Once the truncate statement executes successfully, it will be replaced by a delete statement.

Yes, before the pub_table model runs, the prehook truncates the pub_table and the pub_table model inserts records from int_table. Once the truncate statement executes successfully, it will be replaced by a delete statement.

Is this the actual requirement or you are just testing the pre hook logic ?
If you need the pub_table to be cleared before each dbt run, you can instead switch the materialisation to type table, that way the older data will never be retained in the pub_table.

The Hook is the way to go only when you want to delete some data from the model and not all of it.

Additionally you can also remove the {% if is_incremental() %} check and just ensure it runs every time as

{{ config(materialized="table", ... ) }}
...
select *
from {{ ref("int_table") }}
where _meta_inserted_at = {{ max_meta_inserted_at }}
1 Like

Did u check dbt logs?.

As the control goes inside the IF condition it should execute the TRUNCATE statement.

  1. Plz check in dbt.log file if dbt has run TRUNCATE statement or not.
  2. Check query history in bigquery.
1 Like

I used the below macro and it worked - used Delete instead of Truncate,

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% if execute %}
        {% set table_exists_query %}
            SELECT COUNT(*) 
            FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
            WHERE table_id = '{{ table_name.name }}'
        {% endset %}
            
        {% set table_exists = dbt_utils.get_single_value(table_exists_query) | int %}
        
        {{ log("Table exists value: " ~ table_exists, info=True) }}
        
        {% if table_exists > 0 %}
            {{ log("Deleting records from table: " ~ table_name, info=True) }}

            {%- call statement('delete_records', fetch_result=True) -%}
                DELETE FROM {{ table_name }} WHERE <some_condition>;
            {%- endcall -%}

            {%- set delete_result = load_result('delete_records') -%}
            {{ log(delete_result['response'].rows_affected ~ ' rows deleted', info=True) }}

            -- Add this to verify the execution continues
            {{ log("Successfully executed DELETE on: " ~ table_name, info=True) }}
        {% else %}
            {{ log("Table does not exist: " ~ table_name, info=True) }}
        {% endif %}
    {% endif %}
{% endmacro %}

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.