Handling late-arriving records in Fivetran-synced snapshots

Recently I discovered an issue with some records that were missing from one of our incremental models.
The root of the issue turned out to be a bad assumption about the way that I was using dbt snapshot data to feed into the incremental model.

When Fivetran syncs data into the data warehouse, it includes a column called _fivetran_synced, whose name suggests that it is the timestamp for when the specific record was synced to the warehouse. Therefore, if we compare a dbt snapshot taken at time T1 with one taken at time T2, we would expect that all of the new records at time T2 would have _fivetran_synced values that are bigger than the max(_fivetran_synced) value at T1.

This assumption is often true, but it breaks down if the dbt snapshot is being taken at the same time as data is actively being synced by Fivetran to the warehouse. This is a consequence of the fact that _fivetran_synced does not capture the timestamp for when the record is written to the warehouse. Instead, it is the timestamp of when the the data was being prepared to be written to the warehouse, and records may not be loaded a time-ordered fashion. Therefore, it is possible to have taken a snapshot of a table before all of the data has been loaded, and some of those missing records may have _fivetran_synced values less than records that were already snapshotted.

I chose to handle this situation by constructing a staging model for the snapshots that excluded any records that may have synced at the same time as a dbt snapshot was taken. I did this by first adding a new column to my snapshot model that captured the time that the dbt snapshot was taken (dbt_snapshot_at), e.g.,:

{% snapshot ft_raw_organizations %}
    {{
        config(
          target_schema='snapshots',
          unique_key='id',
          strategy='timestamp',
          updated_at='_fivetran_synced',
        )
    }}

    SELECT
      *,
      CURRENT_TIMESTAMP()::TIMESTAMP AS dbt_snapshot_at
    FROM
      {{ source('ft_raw', 'organizations') }}
{% endsnapshot %}

Then I created a macro I could use for all of my Fivetran staging snapshots that essential rebuilds what the snapshot would have looked like 60 minutes before the dbt snapshot was taken (this value works well for us because we only load data into Fivetran every 3 hours; if you load data more frequently, you may need to adjust the macro below):

{% macro stage_fivetran_snapshot(source_model) %}

WITH max_dbt_snapshot_at AS (
  SELECT MAX(dbt_snapshot_at) AS max_dbt_snapshot_at FROM {{ source_model }}
)
SELECT
  {{ dbt_utils.star(from=source_model, except=['DBT_VALID_TO']) }},
  IFF((ROW_NUMBER() OVER (PARTITION BY id ORDER BY dbt_valid_from DESC) = 1), NULL, dbt_valid_to) AS dbt_valid_to
FROM
  {{ source_model }}
WHERE
  -- Exclude records that were synced too close to the last snapshot date.
  -- This is needed because records with a _fivetran_synced_at value may physically arrive
  -- in the table a few minutes after the timestamp is generated.  If Fivetran is in the middle of
  -- performing a sync at the same time that dbt is creating a snapshot, then there may be
  -- some records in the table with _fivetran_sync_at values that are _greater_ than records
  -- that will be end up being written shortly after the snapshot is finished.
  -- By excluding records that were synced close to the snapshot, we reduce the chance
  -- of missing any "late arriving" records from our snapshots, which can affect
  -- downstream incremental models.
  dbt_valid_from < (SELECT max_dbt_snapshot_at FROM max_dbt_snapshot_at) - INTERVAL '60 minutes'

{% endmacro %}
1 Like

Hi Sterling,
I read quickly through your article, so please correct me if I understood wrongly.

First thank you for pointing to the fact that the _fivetran_synced field is not the landing time on our DB, but closer to the extraction time from the source. This is valuable info to have clear in mind.

Coming to the load process, it looks to me that to avoid late arriving issues, you do not load the data arrived in the last hour.
I agree this probably works well in practice, but it makes your data one hour stale and does not fully covers you as you can not guarantee that you will not have timestamps arriving more than one hour late.
Being very careful to not “produce” duplicated data from your staging table means that your load does not need to be idempotent, but that also means that you cannot use that ELT to re-load some interval of data. What would be the action should you discover not all data is loaded?

Have you some special reason why you do not take a similar, but reversed approach to load all data, plus all the one that is less than X hours old?
Building your ELT that way means the load has to be idempotent and that if needed you could re-load as much of the past as you wish.

In general I prefer the “at least once” model from the staging, handling the fact that some data will arrive twice. You can check for new VS duplicates in many ways: with Hash_Difference fields, like done in Data Vault, or a plain old outer join or just re-writing (overwriting) the data (like del over key or over a timestamp + insert) if your incoming data does not evolve over time (like transactions or in general events).

Ciao, Roberto

1 Like

Hi Roberto, those are all really good points. I wouldn’t expect this solution to work in all cases.

In our case, we only load the data warehouse once per day, and our snapshots are only taken every 3 hours. The data load process only take about 10 minutes every 3 hours. When the snapshots line up with the data load, it is possible that the data will be 1 hour stale, but that’s not really important in our situation given the daily load of snapshots taken every 3 hours.

We are also fortunate enough to not have such a large amount of data that running a full refresh of our entire warehouse is feasible. Right now we load data incrementally each day, and then run a full-refresh on the weekends to resolve any small issues with incremental loads that might creep in. These “late-arriving” records were one source of issues that were resolved during the full refresh over the weekend. So with this solution, our daily incremental loads are more accurate.

I could be wrong, but I’m pretty sure this solution is idempotent. We capture the snaphot date at the time of the snapshot, not at the time that dbt runs. So the first incremental dbt run should insert the data, and subsequent incremental dbt runs shouldn’t added any new records.

I think you’re right that another approach could be to incrementally load all data changed since the last incremental load + a little buffer to capture these late-arriving record, and then deduplicate. Both approaches seem valid to me in our situation.