Recently I discovered an issue with some records that were missing from one of our incremental models.
The root of the issue turned out to be a bad assumption about the way that I was using dbt snapshot data to feed into the incremental model.
When Fivetran syncs data into the data warehouse, it includes a column called _fivetran_synced
, whose name suggests that it is the timestamp for when the specific record was synced to the warehouse. Therefore, if we compare a dbt snapshot taken at time T1 with one taken at time T2, we would expect that all of the new records at time T2 would have _fivetran_synced
values that are bigger than the max(_fivetran_synced)
value at T1.
This assumption is often true, but it breaks down if the dbt snapshot is being taken at the same time as data is actively being synced by Fivetran to the warehouse. This is a consequence of the fact that _fivetran_synced
does not capture the timestamp for when the record is written to the warehouse. Instead, it is the timestamp of when the the data was being prepared to be written to the warehouse, and records may not be loaded a time-ordered fashion. Therefore, it is possible to have taken a snapshot of a table before all of the data has been loaded, and some of those missing records may have _fivetran_synced
values less than records that were already snapshotted.
I chose to handle this situation by constructing a staging model for the snapshots that excluded any records that may have synced at the same time as a dbt snapshot was taken. I did this by first adding a new column to my snapshot model that captured the time that the dbt snapshot was taken (dbt_snapshot_at
), e.g.,:
{% snapshot ft_raw_organizations %}
{{
config(
target_schema='snapshots',
unique_key='id',
strategy='timestamp',
updated_at='_fivetran_synced',
)
}}
SELECT
*,
CURRENT_TIMESTAMP()::TIMESTAMP AS dbt_snapshot_at
FROM
{{ source('ft_raw', 'organizations') }}
{% endsnapshot %}
Then I created a macro I could use for all of my Fivetran staging snapshots that essential rebuilds what the snapshot would have looked like 60 minutes before the dbt snapshot was taken (this value works well for us because we only load data into Fivetran every 3 hours; if you load data more frequently, you may need to adjust the macro below):
{% macro stage_fivetran_snapshot(source_model) %}
WITH max_dbt_snapshot_at AS (
SELECT MAX(dbt_snapshot_at) AS max_dbt_snapshot_at FROM {{ source_model }}
)
SELECT
{{ dbt_utils.star(from=source_model, except=['DBT_VALID_TO']) }},
IFF((ROW_NUMBER() OVER (PARTITION BY id ORDER BY dbt_valid_from DESC) = 1), NULL, dbt_valid_to) AS dbt_valid_to
FROM
{{ source_model }}
WHERE
-- Exclude records that were synced too close to the last snapshot date.
-- This is needed because records with a _fivetran_synced_at value may physically arrive
-- in the table a few minutes after the timestamp is generated. If Fivetran is in the middle of
-- performing a sync at the same time that dbt is creating a snapshot, then there may be
-- some records in the table with _fivetran_sync_at values that are _greater_ than records
-- that will be end up being written shortly after the snapshot is finished.
-- By excluding records that were synced close to the snapshot, we reduce the chance
-- of missing any "late arriving" records from our snapshots, which can affect
-- downstream incremental models.
dbt_valid_from < (SELECT max_dbt_snapshot_at FROM max_dbt_snapshot_at) - INTERVAL '60 minutes'
{% endmacro %}