We’ve started running our first implementation of the
archive feature on source data and wanted to share our experience and get some feedback/reactions.
- We have source data across a few different tables where new rows may be added and existing rows may have values updated, including changing the associations between tables (i.e. Table1.ForeignKey1=‘A’ today, but then Table1.ForeignKey1=‘B’ tomorrow
- There is no
updated_timestampfield in the source data that indicates when any record got updated, so that needs to be inferred based on the data being different between different reads/snapshots
- Luckily, data does not get deleted so we don’t have to deal with that here
Why use DBT Archive at all?
In general, I believe that changes to this data should be stored on a change-by-change basis upstream in the software where data is generated, rather than captured downstream in our batch-replicated analytics database. For the time being, using
archive, all we’re trying to do is maintain a paper trail of historic states (albeit somewhat incomplete), so we can have conversations internally about what has changed and be confident in the past state of the table. We may or may not build more complex models on top of these archived tables in the future. We will also consider extending the naming convention, scheduling, etc. approaches to other tables if this proves successful and useful.
Deciding what to archive
All else equal, archiving raw/source data is preferable vs. “modeled data”.
In this case, we decided to archive our “modeled data” (a DBT view) instead of modeling the raw source data for a few reasons:
- Our modeling performs some joins and simple renames in order to standardize datasets from different vendors, but there is no filtering or aggregation being performed, and the “modeled” data contains all the relevant fields that we are trying to capture changes
- Source data is replicated onto Redshift hourly, DBT models are rebuilt on the same schedule, so there would be no advantage to archiving the source data in terms of frequency/granularity of changes captured
- Source data is highly normalized, so it would require setting up ~5 different archived tables across ~10 different schemas, resulting in 50 archived tables that we would have to stitch back together, instead we’re able to archive a single DBT view which has joins and unions already performed
Faking an update date
In addition to a unique key, the
archive configuration requires an
updated_at argument, which our data does not contain. To get around this, in order to force data to be recorded every time archive is called, we created a new DBT model, materialized as a view. For the existing DBT model
global_detailed_data, we created a new model
scd_view_global_detailed_data defined as:
select *, current_timestamp as update_date from global_detailed_data
And we then configured archival in dbt_project.yml as:
source_table: scd_view_global_detailed_data target_table: scd_archived_global_detailed_data updated_at: "update_date" unique_key: "unique_key"
We use Sinter for scheduling, so we have
dbt archive scheduled to run once daily, overnight.
Determining the changed rows from recorded data
Scheduling this archival function as-is populates a table every day with every record, whether or not there has been a change. The archived data contains new columns,
valid_to that indicate the time periods between which those records are relevant (the record with
valid_to null is the most recent).
We now need logic to filter that table down by comparing rows, and identifying instances where subsequent values don’t completely match. Here is a statement that I constructed for that purpose, leveraging the “except” syntax:
-- This returns every row where there isn't a next row with all the same results. -- So if A changes to B, this returns A with changes as ( select unique_key, fieldlist, valid_to from dbt_prod.scd_archived_global_detailed_data where valid_to is not null except ( select unique_key, fieldlist, valid_from from dbt_prod.scd_archived_global_detailed_data ) ) -- In the example above, since only row A is returned, this fetches B and unions the datasets -- So we have every combination of rows that changed, with their before and after. -- Uses union vs. union all because changes may be back-to-back, we don't want duplicates , unioned_changes as ( select nextRecord.unique_key, nextRecord.fieldlist, copy.valid_from, nextRecord.valid_to from changes inner join dbt_prod.scd_archived_global_detailed_data nextRecord on nextRecord.unique_key=changes.unique_key and nextRecord.valid_from=changes.valid_to inner join dbt_prod.scd_archived_global_detailed_data copy on nextRecord.unique_key=copy.unique_key and nvl(nextRecord.valid_to,getdate())=nvl(copy.valid_to,getdate()) union select changes.unique_key, changes.fieldlist, copy.valid_from ,changes.valid_to from changes inner join dbt_prod.scd_archived_global_detailed_data copy on changes.unique_key=copy.unique_key and nvl(changes.valid_to,getdate())=nvl(copy.valid_to,getdate()) ) -- select and sort select * from unioned_changes order by mtv_city, id, valid_to
So far, as we just explore this archived data, we have not added the above query to our DBT model, we’re just running it interactively.
The order of magnitude of this dataset we’re archiving is in the thousands, so we’re still not dealing with a massive dataset even if we let snapshots run for a year (that will yield ~ a million rows). Eventually, if this sticks around, we will consider scheduling a task to prune the records which are completely redundant (I have to think about the implications, that would break the logic we wrote above).