Log rows affected for every model execution

The problem I’m having

Unable to log the rows affected, after every model run

The context of why I’m trying to do this

To know how many rows got affected after every model run for observability purposes.

What I’ve already tried

I used call statement but it looks like it can be used again DMLs only and not for retrievals

Some example code or error messages

This is the pub_table model I have, for which, I would like to log the rows affected after the model run. I have 3 such models(including this), so is there a better approach for this, like defining it in the dbt_project.yml so that referencing it in the models, runs and logs the rows affected at the end of the run (kinda post_hook).

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
{% set max_meta_inserted_at = get_max_meta_inserted_at(ref("int_table"), airflow_info) %}

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        cluster_by=["_meta_source_file","_meta_use_case","transaction_date"],
        pre_hook=delete_existing_records(
            this, # pub_table
            ref("int_table"),
            max_meta_inserted_at
        )
    )
}}


{%- call statement('pub_table__insert_rows', fetch_result=True) -%}

    select *
    from {{ ref("int_table") }} as int_model
    {% if is_incremental() %}
        where int_model._meta_inserted_at = {{ max_meta_inserted_at }}
        # where 1<>1
    {% else %}  
        where int_model._meta_inserted_at is not null
    {% endif %}

{% endcall %}

{%- set result = load_result('pub_table__insert_rows') -%}
{{ log(result['response'].rows_affected ~ ' rows affected', info=True) }}

Hi @madm4niac

The number following SUCCESS in the console logs indicates the count of records affected during a model run.

For the following scenarios, this value will be 1:

  1. Initial run of an incremental model.
  2. Building a model with table materialization.
  3. Building a model with view materialization.
  4. Running a model in full-refresh mode.

Alternatively, you can find this information in the dbt log file. Search for the query executed by the dbt model to populate data, and you’ll see the affected record count following the query.

Form the looks of it, I see you are using dbt-cloud. I’m using dbt-core (task) which is triggered by the Airflow and I would like to print the rows affected in the Airflow logs. Is there a way to do this?

Wherever you execute your dbt command, it should log similar messages. Are you able to see these messages in your Airflow logs?

I can’t access the log file since the dbt-core runs on a GCP cloud run, and I can’t see them in the Airflow logs, that’s why I want to log them explicitly(like below). In the following case, I can see the log messages in the Airflow but here I’m logging the messages of the Delete statement and it works but the same approach doesn’t work for the Select statement.

{% macro delete_existing_records(table_name, ref_table_name, max_meta_inserted_at) %}
    {% if execute %}
        {% set table_exists_query %}
            SELECT COUNT(*) 
            FROM `{{ table_name.project }}.{{ table_name.dataset }}.__TABLES__`
            WHERE table_id = '{{ table_name.name }}'
        {% endset %}
            
        {% set table_exists = dbt_utils.get_single_value(table_exists_query) | int %}
        
        {{ log("Table exists value: " ~ table_exists, info=True) }}
        
        {% if table_exists > 0 %}
            {{ log("Deleting records from table: " ~ table_name, info=True) }}

            {%- call statement('delete_records', fetch_result=True) -%}
                DELETE FROM {{ table_name }} WHERE <some_condition>;
            {%- endcall -%}

            {%- set delete_result = load_result('delete_records') -%}
            {{ log(delete_result['response'].rows_affected ~ ' rows deleted', info=True) }}

            -- Add this to verify the execution continues
            {{ log("Successfully executed DELETE on: " ~ table_name, info=True) }}
        {% else %}
            {{ log("Table does not exist: " ~ table_name, info=True) }}
        {% endif %}
    {% endif %}
{% endmacro %}