Test for table volume changes over time using Delta Lake Time Travel

Created a test that uses Delta Lake Time Travel to check if a table’s total volume has changed.

Configurable to check for increases, decreases, or a change in either direction. Also handles cases where the table does not exist gracefuly. If the table was created more recently than the specified lookback window, it uses the earliest table version.

Please feel free to reuse and iterate. Currently only tested on the dbt-databricks adapter.

{% test total_table_volume_change (
    model,
    period_name = "day",
    periods_ago = 1,
    change_direction = "lower",
    where = ""
)%}

    {#
        Description:
            Uses Delta time travel to detect if the total number of rows in the table
            has changed compared to a point x number of periods ago. By default test checks 1 day in
            the past. By default fires an error if the table has fewer records than the table comparison
            point. If the table was created more recently than x number of periods ago, uses the table
            volume as of when the table was created.

        Inputs:
            period_name: Can use any valid Databricks datepart value
            periods_ago: Number of period_name ago to check the table volume
            change_direction:
                - If 'lower' triggers when volume decreased compared to earlier point in time
                - If 'higher' triggers when volume increased compared to earlier point in time
                - If 'any' triggers when volume is not equal compared to earlier point in time
            where (optional): Specify a where clause to filter the table count by

        Example [checks if table has lower number of records than 2 weeks ago]:
        tests:
            - name: test_name
              test_name: total_table_volume_change
              period_name: "week"
              periods_ago: 2
              change_direction: "lower"
    #}

    {#
        Check if the table in question exists. If it does not, then the test should pass automatically.
        If we don't do this dbt compile will fail in local dev because the table doesn't exist in the dev environment.
    #}

    {% set relation = adapter.get_relation(
        database = database,
        schema = model.schema,
        identifier = model.name
    ) %}

    {% if not relation %}
        -- sql that always passes the test case
        with data as (select null)
        select * from data where 1=2

    {% else %}

        {#
            Identify the latest table version that is either at least <periods_ago> number of <period_name>
            old or, if the table was created fewer days ago than that, the first version, ie. version 0.
        #}

        {%- call statement('fetch_version', fetch_result=True) -%}
            select max(version)
            from (describe history {{ model }})
            where (
                timestamp <= current_timestamp() - interval {{ periods_ago }} {{ period_name }}
                or version = 0
            )
        {%- endcall -%}

        {#
            Convert call statement reuslts to string and strip out the extraneous
            characters to get the version number.
        #}

        {%- set version_to_use = load_result('fetch_version')['data']|string -%}
        {%- set version_to_use = version_to_use|replace("[(","")|replace(",)]","") -%}

        select today.num_rows - earlier_date.num_rows as change_num_rows

        from (
            select count(1) as num_rows
            from {{ model }}
            {% if where != "" %}
            where {{ where }}
            {% endif %}
        ) today

        cross join (
            select count(1) as num_rows
            from {{ model }} version as of {{ version_to_use }}
            {% if where != "" %}
            where {{ where }}
            {% endif %}
        ) earlier_date

        where (
                   ('{{ change_direction }}' = 'lower'  and today.num_rows - earlier_date.num_rows <  0)
                or ('{{ change_direction }}' = 'higher' and today.num_rows - earlier_date.num_rows >  0)
                or ('{{ change_direction }}' = 'any'    and today.num_rows - earlier_date.num_rows <> 0)
            )
    {% endif %}

{% endtest %}