The problem I’m having
decent test to alert rowcount differences on staging models
The context of why I’m trying to do this
we like to test our staging models rowcount versus previous test. If the rowcount is less than it needs to fail so that the scheduled job will not start.
What I’ve already tried
i tried with various sql scripts, but it is a bit clunky if a table is deleted/added. Does someone has a sturdy and reliable solution?
Some example code or error messages
sources yaml:
version: 2
models:
- name: row_count_history
description: "Historical snapshot of row counts for all monitored models"
columns:
- name: unique_id
description: "Unique identifier for each snapshot (model + timestamp)"
tests:
- unique
- not_null
- name: model_name
description: "Name of the model being monitored"
- name: row_count
description: "Number of rows in the model at the snapshot"
- name: snapshot_date
description: "Date of the snapshot"
- name: snapshot_timestamp
description: "Exact timestamp of the snapshot"
- name: row_count_monitor
description: "Comparison of current row count vs previous load for all models"
columns:
- name: model_name
description: "Name of the model being monitored"
- name: current_count
description: "Row count for the current load"
- name: prev_count
description: "Row count from the previous load"
- name: difference
description: "Difference in row count between current and previous load"
- name: pct_change
description: "Percentage change from previous load (null if previous count is 0 or null)"
- name: status
description: "Alert status based on change thresholds and previous load"
tests:
- accepted_values:
values: ['OK','No baseline','ALERT: Previous count zero']
severity: warn
- name: previous_load_timestamp
description: "Timestamp of the previous load for this model"
- name: current_load_date
description: "Date of the current load"
- name: current_load_timestamp
description: "Timestamp of the current load"
- name: check_date
description: "Date when this comparison was performed"
- name: check_timestamp
description: "Timestamp when this comparison was performed"
--------------------------------
my sql script to create rowcount table
-------------------------------
{{
config(
materialized='incremental',
unique_key='unique_id',
on_schema_change='append_new_columns'
)
}}
{% set prod_database = target.database %}
{% set prod_schema = target.schema %}
{% set models_to_monitor = [
'stg_dkh_mkg__arti',
'stg_dkh_mkg__betc',
'stg_dkh_mkg__cred'
] %}
{% set existing_models = [] %}
{% set skipped_models = [] %}
{% if execute %}
{% for model in models_to_monitor %}
{% set exists_query %}
select count(*) as cnt
from {{ prod_database }}.INFORMATION_SCHEMA.TABLES
where TABLE_SCHEMA = '{{ prod_schema }}'
and TABLE_NAME = '{{ model }}'
{% endset %}
{% set result = run_query(exists_query) %}
{% if result and result.columns[0].values()[0] | int > 0 %}
{% do existing_models.append(model) %}
{% else %}
{% do skipped_models.append(model) %}
{% endif %}
{% endfor %}
{% do log('✅ Found ' ~ (existing_models | length) ~ ' existing tables.', info=True) %}
{% if skipped_models | length > 0 %}
{% do log('⚠️ Missing: ' ~ (skipped_models | join(', ')), info=True) %}
{% endif %}
{% else %}
{% set existing_models = models_to_monitor[:3] %}
{% endif %}
with combined as (
{% for model in existing_models %}
select
'{{ model }}' as model_name,
count(*) as row_count,
convert(date, getdate()) as snapshot_date,
convert(datetime2(3), getdate()) as snapshot_timestamp,
convert(varchar(32), hashbytes('MD5', '{{ model }}_' + convert(varchar, getdate(), 121)), 2) as unique_id
from {{ prod_database }}.{{ prod_schema }}.{{ model }}
{% if not loop.last %}
union all
{% endif %}
{% endfor %}
)
select *
from combined
{% if is_incremental() %}
where unique_id not in (select unique_id from {{ this }})
{% endif %}
------------------------------
second sql script to compare changes
------------------------------
{{
config(
materialized='table'
)
}}
with ranked_counts as (
select
model_name,
row_count,
snapshot_date,
snapshot_timestamp,
-- get the previous load per model, ordered by timestamp
lag(row_count) over (
partition by model_name
order by snapshot_timestamp
) as prev_count,
lag(snapshot_timestamp) over (
partition by model_name
order by snapshot_timestamp
) as prev_timestamp
from {{ ref('row_count_history') }}
),
latest_counts as (
-- get only the latest load per model
select *
from ranked_counts r
where snapshot_timestamp = (
select max(snapshot_timestamp)
from ranked_counts r2
where r2.model_name = r.model_name
)
)
select
model_name,
row_count as current_count,
prev_count,
row_count - coalesce(prev_count, 0) as difference,
case
when prev_count > 0
then round(((row_count - prev_count) * 100.0 / prev_count), 2)
else null
end as pct_change,
case
when prev_count is null then 'No baseline'
when prev_count = 0 then 'ALERT: Previous count zero'
when abs(row_count - prev_count) * 100.0 / prev_count > 20 then 'ALERT: Large change'
when abs(row_count - prev_count) * 100.0 / prev_count > 10 then 'WARNING: Moderate change'
when abs(row_count - prev_count) * 100.0 / prev_count < 0 then 'ALERT: Less rows'
else 'OK'
end as status,
prev_timestamp as previous_load_timestamp,
snapshot_date as current_load_date,
snapshot_timestamp as current_load_timestamp,
CONVERT(DATE, GETDATE()) as check_date,
CONVERT(DATETIME2(3), GETDATE()) as check_timestamp
from latest_counts