TLDR: I also have the use case where records get deleted and then brought back to life. I am pro writing to dbt_valid_to on deletion and creating a new record on reinstatement to make sure I have a complete history. I’m super interested to hear if people have a similar problem and if there’s another way of looking at it
We have a similar case, where we have a source system that represents relationships between entities in a table with a composite key of the unique identifiers of the entities. These relationships can be hard-deleted and then reinstated.
Since dbt snapshots are so convenient for tracking history over time and understanding what has happened. I wanted to make sure how we tracked these deletes and reinstantiation fit my mental model that I was using for snapshots.
What I am trying to achieve with snapshots:
- For any point in time (at the granularity of the frequency of the snapshot runs), we can understand what was the status of the world at that point in time. (IE, who were members of which organization)
- For any relationship, what is the history of those two entities (IE. was this user a member of this organization through august, then it was deleted, but they’ve just been invited back)
dbt_valid_to and dbt_valid_from are integral pieces for me to be able to understand what existed at any point in time and recreate the history. So I wanted the case of a hard delete, to lead to invalidating the previous existing record, ie set dbt_valid_to. When the record is reinstated, I wanted it to create a new row with the current time’s timestamp. That way I can understand the period of time when the record had been deleted as the delta of time between the previous dbt_valid_to and the current record’s dbt_valid_from.
Heres the logic I used to achieve this:
{% macro snapshot_check_delete_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}
{# don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
{%- endif %}
{% set updated_at = snapshot_string_as_time(now) %}
{% set column_added = false %}
{% if check_cols_config == 'all' %}
{% set column_added, check_cols = snapshot_check_all_get_existing_columns(node, target_exists) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}
{%- set row_changed_expr -%}
(
{%- if column_added -%}
TRUE
{%- else -%}
-- this indicates that the source record was hard-deleted
({{ current_rel }}.{{ primary_key }} is null) or
{% for col in check_cols -%}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
({{ snapshotted_rel }}.{{ col }} is null) != ({{ current_rel }}.{{ col }} is null)
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
{%- endif -%}
)
{%- endset %}
{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}
{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}
{% macro snapshot_staging_table(strategy, source_sql, target_relation) -%}
with snapshot_query as (
{{ source_sql }}
),
snapshotted_data as (
select *,
{{ strategy.unique_key }} as dbt_unique_key
from {{ target_relation }}
),
insertions_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id
from snapshot_query
),
updates_source_data as (
select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to
from snapshot_query
),
deleted_records as (
select
{{ strategy.unique_key }} as dbt_unique_key
from snapshotted_data
group by 1 having count(dbt_valid_to) = count(*)
),
insertions as (
select
'insert' as dbt_change_type,
source_data.*
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)
)
),
updates as (
select
'update' as dbt_change_type,
source_data.* except(dbt_valid_to),
coalesce(source_data.dbt_valid_to,{{ strategy.updated_at }}) as dbt_valid_to,
snapshotted_data.dbt_scd_id
from snapshotted_data
left join updates_source_data as source_data
on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and (
{{ strategy.row_changed }}
)
),
reinsertions as (
select
'insert' as dbt_change_type,
source_data.*
from insertions_source_data as source_data
join deleted_records on deleted_records.dbt_unique_key = source_data.dbt_unique_key
)
select * from insertions
union all
select * from updates
union all
select * from reinsertions
{%- endmacro %}
Curious if anyone else has different ways of thinking about this problem