Created a test that uses Delta Lake Time Travel to check if a table’s total volume has changed.
Configurable to check for increases, decreases, or a change in either direction. Also handles cases where the table does not exist gracefuly. If the table was created more recently than the specified lookback window, it uses the earliest table version.
Please feel free to reuse and iterate. Currently only tested on the dbt-databricks
adapter.
{% test total_table_volume_change (
model,
period_name = "day",
periods_ago = 1,
change_direction = "lower",
where = ""
)%}
{#
Description:
Uses Delta time travel to detect if the total number of rows in the table
has changed compared to a point x number of periods ago. By default test checks 1 day in
the past. By default fires an error if the table has fewer records than the table comparison
point. If the table was created more recently than x number of periods ago, uses the table
volume as of when the table was created.
Inputs:
period_name: Can use any valid Databricks datepart value
periods_ago: Number of period_name ago to check the table volume
change_direction:
- If 'lower' triggers when volume decreased compared to earlier point in time
- If 'higher' triggers when volume increased compared to earlier point in time
- If 'any' triggers when volume is not equal compared to earlier point in time
where (optional): Specify a where clause to filter the table count by
Example [checks if table has lower number of records than 2 weeks ago]:
tests:
- name: test_name
test_name: total_table_volume_change
period_name: "week"
periods_ago: 2
change_direction: "lower"
#}
{#
Check if the table in question exists. If it does not, then the test should pass automatically.
If we don't do this dbt compile will fail in local dev because the table doesn't exist in the dev environment.
#}
{% set relation = adapter.get_relation(
database = database,
schema = model.schema,
identifier = model.name
) %}
{% if not relation %}
-- sql that always passes the test case
with data as (select null)
select * from data where 1=2
{% else %}
{#
Identify the latest table version that is either at least <periods_ago> number of <period_name>
old or, if the table was created fewer days ago than that, the first version, ie. version 0.
#}
{%- call statement('fetch_version', fetch_result=True) -%}
select max(version)
from (describe history {{ model }})
where (
timestamp <= current_timestamp() - interval {{ periods_ago }} {{ period_name }}
or version = 0
)
{%- endcall -%}
{#
Convert call statement reuslts to string and strip out the extraneous
characters to get the version number.
#}
{%- set version_to_use = load_result('fetch_version')['data']|string -%}
{%- set version_to_use = version_to_use|replace("[(","")|replace(",)]","") -%}
select today.num_rows - earlier_date.num_rows as change_num_rows
from (
select count(1) as num_rows
from {{ model }}
{% if where != "" %}
where {{ where }}
{% endif %}
) today
cross join (
select count(1) as num_rows
from {{ model }} version as of {{ version_to_use }}
{% if where != "" %}
where {{ where }}
{% endif %}
) earlier_date
where (
('{{ change_direction }}' = 'lower' and today.num_rows - earlier_date.num_rows < 0)
or ('{{ change_direction }}' = 'higher' and today.num_rows - earlier_date.num_rows > 0)
or ('{{ change_direction }}' = 'any' and today.num_rows - earlier_date.num_rows <> 0)
)
{% endif %}
{% endtest %}