rowcount test with warn and error severity

The problem I’m having

decent test to alert rowcount differences on staging models

The context of why I’m trying to do this

we like to test our staging models rowcount versus previous test. If the rowcount is less than it needs to fail so that the scheduled job will not start.

What I’ve already tried

i tried with various sql scripts, but it is a bit clunky if a table is deleted/added. Does someone has a sturdy and reliable solution?

Some example code or error messages

sources yaml:

version: 2

models:
  - name: row_count_history
    description: "Historical snapshot of row counts for all monitored models"
    columns:
      - name: unique_id
        description: "Unique identifier for each snapshot (model + timestamp)"
        tests:
          - unique
          - not_null
      - name: model_name
        description: "Name of the model being monitored"
      - name: row_count
        description: "Number of rows in the model at the snapshot"
      - name: snapshot_date
        description: "Date of the snapshot"
      - name: snapshot_timestamp
        description: "Exact timestamp of the snapshot"

  - name: row_count_monitor
    description: "Comparison of current row count vs previous load for all models"
    columns:
      - name: model_name
        description: "Name of the model being monitored"
      - name: current_count
        description: "Row count for the current load"
      - name: prev_count
        description: "Row count from the previous load"
      - name: difference
        description: "Difference in row count between current and previous load"
      - name: pct_change
        description: "Percentage change from previous load (null if previous count is 0 or null)"
      - name: status
        description: "Alert status based on change thresholds and previous load"
        tests:
          - accepted_values:
              values: ['OK','No baseline','ALERT: Previous count zero'] 
              severity: warn
      - name: previous_load_timestamp
        description: "Timestamp of the previous load for this model"
      - name: current_load_date
        description: "Date of the current load"
      - name: current_load_timestamp
        description: "Timestamp of the current load"
      - name: check_date
        description: "Date when this comparison was performed"
      - name: check_timestamp
        description: "Timestamp when this comparison was performed"


--------------------------------
my sql script to create rowcount table
-------------------------------

{{
  config(
    materialized='incremental',
    unique_key='unique_id',
    on_schema_change='append_new_columns'
  )
}}

{% set prod_database = target.database %}
{% set prod_schema = target.schema %}

{% set models_to_monitor = [
    'stg_dkh_mkg__arti',
    'stg_dkh_mkg__betc',
    'stg_dkh_mkg__cred'
] %}

{% set existing_models = [] %}
{% set skipped_models = [] %}

{% if execute %}
  {% for model in models_to_monitor %}
    {% set exists_query %}
      select count(*) as cnt
      from {{ prod_database }}.INFORMATION_SCHEMA.TABLES
      where TABLE_SCHEMA = '{{ prod_schema }}'
        and TABLE_NAME = '{{ model }}'
    {% endset %}

    {% set result = run_query(exists_query) %}
    {% if result and result.columns[0].values()[0] | int > 0 %}
      {% do existing_models.append(model) %}
    {% else %}
      {% do skipped_models.append(model) %}
    {% endif %}
  {% endfor %}
  {% do log('✅ Found ' ~ (existing_models | length) ~ ' existing tables.', info=True) %}
  {% if skipped_models | length > 0 %}
    {% do log('⚠️ Missing: ' ~ (skipped_models | join(', ')), info=True) %}
  {% endif %}
{% else %}
  {% set existing_models = models_to_monitor[:3] %}
{% endif %}

with combined as (
  {% for model in existing_models %}
  select
      '{{ model }}' as model_name,
      count(*) as row_count,
      convert(date, getdate()) as snapshot_date,
      convert(datetime2(3), getdate()) as snapshot_timestamp,
      convert(varchar(32), hashbytes('MD5', '{{ model }}_' + convert(varchar, getdate(), 121)), 2) as unique_id
  from {{ prod_database }}.{{ prod_schema }}.{{ model }}
  {% if not loop.last %}
  union all
  {% endif %}
  {% endfor %}
)

select *
from combined

{% if is_incremental() %}
where unique_id not in (select unique_id from {{ this }})
{% endif %}

------------------------------
second sql script to compare changes
------------------------------

{{
  config(
    materialized='table'
  )
}}

with ranked_counts as (
    select
        model_name,
        row_count,
        snapshot_date,
        snapshot_timestamp,
        -- get the previous load per model, ordered by timestamp
        lag(row_count) over (
            partition by model_name 
            order by snapshot_timestamp
        ) as prev_count,
        lag(snapshot_timestamp) over (
            partition by model_name 
            order by snapshot_timestamp
        ) as prev_timestamp
    from {{ ref('row_count_history') }}
),

latest_counts as (
    -- get only the latest load per model
    select *
    from ranked_counts r
    where snapshot_timestamp = (
        select max(snapshot_timestamp) 
        from ranked_counts r2 
        where r2.model_name = r.model_name
    )
)

select
    model_name,
    row_count as current_count,
    prev_count,
    row_count - coalesce(prev_count, 0) as difference,
    case 
        when prev_count > 0
        then round(((row_count - prev_count) * 100.0 / prev_count), 2)
        else null
    end as pct_change,
    case
        when prev_count is null then 'No baseline'
        when prev_count = 0 then 'ALERT: Previous count zero'
        when abs(row_count - prev_count) * 100.0 / prev_count > 20 then 'ALERT: Large change'
        when abs(row_count - prev_count) * 100.0 / prev_count > 10 then 'WARNING: Moderate change'
        when abs(row_count - prev_count) * 100.0 / prev_count < 0 then 'ALERT: Less rows'
        else 'OK'
    end as status,
    prev_timestamp as previous_load_timestamp,
    snapshot_date as current_load_date,
    snapshot_timestamp as current_load_timestamp,
    CONVERT(DATE, GETDATE()) as check_date,
    CONVERT(DATETIME2(3), GETDATE()) as check_timestamp
from latest_counts