Unable to log the rows affected, after every model run
The context of why I’m trying to do this
To know how many rows got affected after every model run for observability purposes.
What I’ve already tried
I used call statement but it looks like it can be used again DMLs only and not for retrievals
Some example code or error messages
This is the pub_table model I have, for which, I would like to log the rows affected after the model run. I have 3 such models(including this), so is there a better approach for this, like defining it in the dbt_project.yml so that referencing it in the models, runs and logs the rows affected at the end of the run (kinda post_hook).
{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}
{{
config(
materialized="incremental",
on_schema_change="fail",
persist_docs={"relation": true, "columns": true},
cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
pre_hook=delete_existing_records(
this, # pub_table
ref("int_table"),
max_meta_inserted_at
)
)
}}
{%- call statement('pub_table__insert_rows', fetch_result=True) -%}
select *
from {{ ref("int_table") }} as int_model
{% if is_incremental() %}
where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
# where 1<>1
{% else %}
where int_model._meta_inserted_at is not null
{% endif %}
{% endcall %}
{%- set result = load_result('pub_table__insert_rows') -%}
{{ log(result['response'].rows_affected ~ ' rows affected', info=True) }}
Alternatively, you can find this information in the dbt log file. Search for the query executed by the dbt model to populate data, and you’ll see the affected record count following the query.
Form the looks of it, I see you are using dbt-cloud. I’m using dbt-core (task) which is triggered by the Airflow and I would like to print the rows affected in the Airflow logs. Is there a way to do this?
I can’t access the log file since the dbt-core runs on a GCP cloud run, and I can’t see them in the Airflow logs, that’s why I want to log them explicitly(like below). In the following case, I can see the log messages in the Airflow but here I’m logging the messages of the Delete statement and it works but the same approach doesn’t work for the Select statement.
{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
{% if execute %}
{% set table_exists_query %}
SELECT COUNT(*)
FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
WHERE table_id = '{{ table_name.name }}'
{% endset %}
{% set table_exists = dbt_utils.get_single_value(table_exists_query) | int %}
{{ log("Table exists value: " ~ table_exists, info=True) }}
{% if table_exists > 0 %}
{{ log("Deleting records from table: " ~ table_name, info=True) }}
{%- call statement('delete_records', fetch_result=True) -%}
DELETE FROM {{ table_name }} WHERE <some_condition>;
{%- endcall -%}
{%- set delete_result = load_result('delete_records') -%}
{{ log(delete_result['response'].rows_affected ~ ' rows deleted', info=True) }}
-- Add this to verify the execution continues
{{ log("Successfully executed DELETE on: " ~ table_name, info=True) }}
{% else %}
{{ log("Table does not exist: " ~ table_name, info=True) }}
{% endif %}
{% endif %}
{% endmacro %}