The problem I’m having
We have a central application database (postgres) that logs transactions among other things. Our pipeline takes the raw data from the central db and loads it into our DWH (also postgres) after which dbt takes over for transformations. Due to data volumes all fact based models are incremental.
Our data products are specific aggregations of these transactions, depending on type, status, etc.
One big issue I have to contend with is that certain fields might change after being loaded in the application DB and there is no specified time frame in which this might happen. The tool I use to load data between DBs accounts for this and will correctly update modified data in our DWH.
I created an incremental strategy that accounts for this situation, but as a result, the table scan takes a lot longer than I would like (~2 minutes per aggregation). It is more than twice as long as it would be with a much simpler incremental strategy.
In fact, the runtime for a full refresh (containing 2 years of data) is only about twice as long as an incremental run processing 10 minutes of new data. This is entirely due to the table scan needed to look for rows that need to be retreated.
The context of why I’m trying to do this
I am refactoring my company’s DWH infrastructure to rely on dbt for transformations. Unfortunately, the current runtime for the aggregations is not acceptable for stakeholders.
The people responsible for the application and the database I have to read from can’t guarantee that data will never change after the fact and can’t (won’t) change anything on their end.
What I’ve already tried
For our normal fact tables, I use the simple incremental strategy
{% if is_incremental() %}
where modifiedat >= (select max(modifiedat) from {{ this }})
{% endif %}
and this works fine. No notes.
After this, there are aggregations that are done on an hourly, daily and monthly level. It is here that things get complicated. If I use this incremental strategy in an aggregation model, if a single row is changed after the fact and the modifiedat field is updated, on a subsequent run, only this changed row will be taken, and the aggregation will no longer reflect the true total of the relevant time period. So I developed the following strategy instead:
select
...
from {{ ref("upstream_model") }}
where 1=1
{% if is_incremental() %}
and date_id in (
select distinct date_id
from {{ ref("upstream_model") }}
where modifiedat >= (select max(modifiedat) from {{ this }})
)
{% endif %}
This strategy will rerun aggregations for all days where any modifications took place. The tradeoff is that we will have to retreat some unnecessary rows sometimes, but this is honestly negligible in terms of compute.
The big downside is that this where clause takes a LOT more time to perform the table scan. It more than doubled.
I tried the same idea, but with an inner join instead of a where clause
with __date_filter as (
select distinct date_id
from {{ ref("upstream_model") }}
where modifiedat >= (select max(modifiedat) from {{ this }})
),
__source as (
select
__um.*
from {{ ref("upstream_model") }} __um
{% if is_incremental() %}
inner join __date_filter
on __um.date_id = __date_filter.date_id
{% endif %}
)
With very similar results. I then tried to add indexes:
{{ config(
materialized='incremental',
schema='mrt',
unique_key=['date_id','hour','other_id_cols'],
indexes=[{'columns': ['date_id']}]
) }}
With again very similar results. I also tried adding modifiedat as an index, although as I expected that didn’t help anything either.
What I am looking for
A dbt, sql or postgres technique that I am overlooking that might speed up this process. I’ve scanned the documentation and scoured the internet looking for someone with a similar problem but either I missed some resource or it’s a rare problem to have.
I looked into table partitioning, but as far as I can see, this is not available for dbt-postgres at this time.
Some alternatives I’m considering
There is no way to know if there are late-changing facts, when there are late-changing facts and how long ago the late-changing facts took place. On top of that, late changing facts happen on a basis that can both be close to loading and much later. So just taking the last few days as an alternative would not account for all possible changes. I could account for the changes that happen close after initial load, but then wouldn’t account for the changes much later. Having the data be correctly reflected immediately after changing the facts is actually not mission critical though.
If there is no technical dbt or sql method to improve the performance of these incremental strategies, I could opt for the following:
- Run aggregation models using simple incremental strategy instead of the one that will be 100% correct all the time.
- Run a full refresh once per day during down time to account for possible late changing facts (full refresh only takes ~15 minutes).
I already know some stakeholders might not fully agree with this workaround, but I can argue this will keep their data more fresh in the 99.9% of days that no late-changing facts have to be treated.