We’ve started running our first implementation of the archive
feature on source data and wanted to share our experience and get some feedback/reactions.
Situation:
- We have source data across a few different tables where new rows may be added and existing rows may have values updated, including changing the associations between tables (i.e. Table1.ForeignKey1=‘A’ today, but then Table1.ForeignKey1=‘B’ tomorrow
- There is no
updated_timestamp
field in the source data that indicates when any record got updated, so that needs to be inferred based on the data being different between different reads/snapshots - Luckily, data does not get deleted so we don’t have to deal with that here
Why use DBT Archive at all?
In general, I believe that changes to this data should be stored on a change-by-change basis upstream in the software where data is generated, rather than captured downstream in our batch-replicated analytics database. For the time being, using archive
, all we’re trying to do is maintain a paper trail of historic states (albeit somewhat incomplete), so we can have conversations internally about what has changed and be confident in the past state of the table. We may or may not build more complex models on top of these archived tables in the future. We will also consider extending the naming convention, scheduling, etc. approaches to other tables if this proves successful and useful.
Deciding what to archive
All else equal, archiving raw/source data is preferable vs. “modeled data”.
In this case, we decided to archive our “modeled data” (a DBT view) instead of modeling the raw source data for a few reasons:
- Our modeling performs some joins and simple renames in order to standardize datasets from different vendors, but there is no filtering or aggregation being performed, and the “modeled” data contains all the relevant fields that we are trying to capture changes
- Source data is replicated onto Redshift hourly, DBT models are rebuilt on the same schedule, so there would be no advantage to archiving the source data in terms of frequency/granularity of changes captured
- Source data is highly normalized, so it would require setting up ~5 different archived tables across ~10 different schemas, resulting in 50 archived tables that we would have to stitch back together, instead we’re able to archive a single DBT view which has joins and unions already performed
Faking an update date
In addition to a unique key, the archive
configuration requires an updated_at
argument, which our data does not contain. To get around this, in order to force data to be recorded every time archive is called, we created a new DBT model, materialized as a view. For the existing DBT model global_detailed_data
, we created a new model scd_view_global_detailed_data
defined as:
select *, current_timestamp as update_date from global_detailed_data
And we then configured archival in dbt_project.yml as:
source_table: scd_view_global_detailed_data
target_table: scd_archived_global_detailed_data
updated_at: "update_date"
unique_key: "unique_key"
We use Sinter for scheduling, so we have dbt archive
scheduled to run once daily, overnight.
Determining the changed rows from recorded data
Scheduling this archival function as-is populates a table every day with every record, whether or not there has been a change. The archived data contains new columns, valid_from
and valid_to
that indicate the time periods between which those records are relevant (the record with valid_to
null is the most recent).
We now need logic to filter that table down by comparing rows, and identifying instances where subsequent values don’t completely match. Here is a statement that I constructed for that purpose, leveraging the “except” syntax:
-- This returns every row where there isn't a next row with all the same results.
-- So if A changes to B, this returns A
with changes as (
select
unique_key, fieldlist, valid_to
from dbt_prod.scd_archived_global_detailed_data
where valid_to is not null
except (
select
unique_key, fieldlist, valid_from
from dbt_prod.scd_archived_global_detailed_data )
)
-- In the example above, since only row A is returned, this fetches B and unions the datasets
-- So we have every combination of rows that changed, with their before and after.
-- Uses union vs. union all because changes may be back-to-back, we don't want duplicates
, unioned_changes as
( select
nextRecord.unique_key, nextRecord.fieldlist, copy.valid_from, nextRecord.valid_to
from changes
inner join dbt_prod.scd_archived_global_detailed_data nextRecord on nextRecord.unique_key=changes.unique_key and nextRecord.valid_from=changes.valid_to
inner join dbt_prod.scd_archived_global_detailed_data copy on nextRecord.unique_key=copy.unique_key and nvl(nextRecord.valid_to,getdate())=nvl(copy.valid_to,getdate())
union
select
changes.unique_key, changes.fieldlist, copy.valid_from ,changes.valid_to
from changes
inner join dbt_prod.scd_archived_global_detailed_data copy on changes.unique_key=copy.unique_key and nvl(changes.valid_to,getdate())=nvl(copy.valid_to,getdate())
)
-- select and sort
select *
from unioned_changes
order by mtv_city, id, valid_to
Next Steps
So far, as we just explore this archived data, we have not added the above query to our DBT model, we’re just running it interactively.
The order of magnitude of this dataset we’re archiving is in the thousands, so we’re still not dealing with a massive dataset even if we let snapshots run for a year (that will yield ~ a million rows). Eventually, if this sticks around, we will consider scheduling a task to prune the records which are completely redundant (I have to think about the implications, that would break the logic we wrote above).