The problem I’m having
The pre_hook fires the macro but the macro isn’t deleting the records from the pub_table
The context of why I’m trying to do this
For every Airflow DAG run, I would like to replace the records in the pub_table with the records from the int_table if there is a match with the field _meta_source_file.
For example,
If the pub_table has 100 records and int_table has 120 records for _meta_source_file = “abc”, then during the DAG run, the pub_table 100 records should be replaced with int_table 120 records.
Basically, delete 100 records from pub_table and insert 120 records from the int_table.
What I’ve already tried
Added a pre_hook to the pub_table config to trigger the macro delete_existing_records - deletes records with matching _meta_source_file from int_table in pub_table.
Some example code or error messages
Failing macro (checks table’s presence and truncates the table if present) -
{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
{% set table_exists_query %}
SELECT COUNT(*)
FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
WHERE table_id = '{{ table_name.name }}' -- returns 1 for pub_table
{% endset %}
{% set table_exists = dbt_utils.get_single_value(table_exists_query) %}
{% if table_exists | int > 0 %}
TRUNCATE TABLE {{ table_name }}
-- DELETE FROM {{ table_name }}
-- WHERE _meta_source_file IS NOT NULL
-- IN (
-- SELECT DISTINCT _meta_source_file
-- FROM {{ ref_table_name }}
-- WHERE _meta_inserted_at = SAFE_CAST({{ max_meta_inserted_at }} AS timestamp)
-- )
{% endif %}
{% endmacro %}
simple macro - this works(truncates the table before incremental insertion):
{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
TRUNCATE TABLE {{ table_name }}
{% endmacro %}
pub_table:
{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}
{{
config(
materialized="incremental",
on_schema_change="fail",
persist_docs={"relation": true, "columns": true},
cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
pre_hook=delete_existing_records(
this, # pub_table
ref("int_table"),
max_meta_inserted_at
)
)
}}
select *
from {{ ref("int_table") }} as int_model
{% if is_incremental() %}
where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
# where 1<>1
{% else %}
where int_model._meta_inserted_at is not null
{% endif %}