I’d like to share the way we collect source (RAW) loading stats metadata to ( subsequently ) do freshness / completeness/ uniqueness / data tests on . This solves multiple problems we now face with testing:
- This pattern of collecting metadata for testing is much faster then doing tests per table
- dbt does not store test results in a database (yet, I know this is in the backlog!) We can do proper freshness / completeness tests at the table / client_id level only when we look at loading patterns over multiple days!
Some background Info:
- This collecting of metadata is done on a series of 600 tables that are loaded via a CDC loading pattern (Debezium + Kafka).
- The database we use is snowflake
- The layer we load to is called RAW HISTORIZED STAGING (which we abbreviate to RHS ). It has the charesteristics that all source tables columns are loaded as JSON to ONE column called JSON_SRC plus an MD5 version of that content ( JSON_SRC_MD5 )
- In addition, the source primary / unique columns and data are stored in JSON_SRC_KEY
- Each clients stores their source data in a different DB instance, we stream it to the same RHS table. We therefore extend all RHS tables with a column that encodes for the client (called client_id in this example) . As such, this extends the primary key of all RHS tables with this field. The source system logical database name is stored in META_SOURCE
- When loading to RHS, we store additional metadata fields: META_SOURCE_DTS (CDC extraction / change datetime) ,META_LOAD_DTS (DWH load datetime) and META_CDC_DML_TYPE (Debezium CDC operation)
How it works:
- We loop over the sources graph, creating a massive UNION ALL of all source XYZ tables
- Main query collects various loading stats per day that we then use for various data tests
- Incremental model, first load is quite heavy, subsequent loads are ok.
Caveats:
- Could time out when your source is too large (works for our 600 tables source though on snowflake
- constructing a model like this misses source dependencies, which we ‘resolve’ by explicitly naming a few using depends on
The code of the model RHS_LOADING_STATS:
{{
config(
materialized=‘incremental’,
unique_key=‘rhs_loading_stats_pk’,
tags=[“sf_logging”]
)
}}
–This model depends on all XYZ RHS sources, only explicitly mention alphabetical first and last source to make the dbt docs DAG at least display SOME source dependencies…
– depends on: {{source(‘XYZ’, ‘table_a’) }}
– depends on: {{source(‘XYZ’, ‘table_z’) }}
{#- 1st create an array with the filtered sources we want to validate -#}
{%- set sources = [ ] -%}
{%- for node in graph.sources.values() -%}
{%- if node.source_name == ‘XYZ’ -%}
{%- do sources.append(node) -%}
{%- endif -%}
{%- endfor %}
{#- using the sources array, create the loading stats query -#}
{% for node in sources %}
{% if loop.first -%}with vw as ( {%- endif -%}
SELECT
‘{{ node.name.upper() }}’ as t
,‘{{ node.database }}’ as d
,‘{{ node.schema }}’ as s
,client_id,meta_source,meta_load_dts,meta_source_dts,META_CDC_DML_TYPE, JSON_SRC_KEY, JSON_SRC_MD5
FROM {{ source(node.source_name, node.name) }}
{% if not loop.last -%}
UNION ALL
{%- else -%}
)
SELECT
to_varchar(to_date(meta_load_dts)) || ‘|’ || t || ‘|’ || d || ‘|’ || client_id as rhs_loading_stats_pk
,t as table_name
,s as TABLE_SCHEMA_NAME
,d as TABLE_CATALOG_NAME
,client_id
,listagg(distinct meta_source,‘|’) as meta_source
,to_date(meta_load_dts) as meta_load_day
,min(meta_load_dts) as day_min_meta_load_dts
,max(meta_load_dts) as day_max_meta_load_dts
,min(meta_source_dts) as day_min_meta_source_dts
,max(meta_source_dts) as day_max_meta_source_dts
,count(1) as day_rows_added
,sum(case when META_CDC_DML_TYPE=‘r’ then 1 else 0 end) as day_rows_snapshot_scan_added
,sum(case when META_CDC_DML_TYPE=‘c’ then 1 else 0 end) as day_rows_insert_added
,sum(case when META_CDC_DML_TYPE=‘u’ then 1 else 0 end) as day_rows_update_added
,sum(case when META_CDC_DML_TYPE=‘d’ then 1 else 0 end) as day_rows_delete_added
,count(distinct JSON_SRC_KEY, JSON_SRC_MD5, META_SOURCE_DTS, META_LOAD_DTS) as day_technical_unique_rows
FROM vw
{% if is_incremental() %}
– this filter will only be applied on an incremental run
where meta_load_dts >= (select to_date(max(day_max_meta_load_dts)) from {{ this }})
{% endif %}
group by 1,2,3,4,5,7
{% endif %}
{%- endfor %}