Background (feel free to skip)
My team has set up dbt to produce materialized stage tables from a single source, fetching data that’s between 6 and 2 hours old (we don’t extract everything up to realtime due to a sometimes pretty significant latency between event creation and commit to source). We do this on an hourly frequency and replace the stage tables each time.
However, in some instances our stakeholders need to see things in realtime (or as close to it as possible). So to tackle this problem, a previous team member set up an almost identical DAG branch, but with a few key differences:
- Data is limited to 10 - 30 minutes old, instead of 2 - 6 hours.
- The run frequency is every 10 minutes.
- The materialization is incremental, where the older third of the data is first dropped in a prehook, before a new 10 minute batch is inserted.
Personally I see many issues with this approach – duplicated code, unnecessary complexity etc. – since both branches are technically doing the exact same thing. So I had an idea: what if I could combine both branches into one? Have stage tables with data ranging from 10 minutes - 6 hours, fetch new data every 10 minutes, and filter out the more recent entries for our realtime needs downstream.
Using an incremental model here works well, but remember the latency problem I mentioned before? Because the realtime models fetch much more recent data, they will also never see many of the delayed events and therefore be more unreliable. This is acceptable for our realtime applications, but not for the long term ones. So now to my current idea.
Problem
I want to incrementally add 10 minutes of data to our stage models, every 10 minutes, using an incremental materialization. But, once every hour, I want to rebuild those models from scratch, using the table materialization. To do this, I created this macro
{% macro replace_every_hour() %}
{%- if run_started_at.strftime("%M") | int < 10 -%}
table
{%- else -%}
incremental
{%- endif -%}
{% endmacro %}
that simply returns the materialization type I want depending on when the run was triggered. In the config I then call the macro as config(materialized=replace_every_hour())
. This seems to work when I build the model initially, but when I rebuild it, regardless of the time of run, it retains the same materialization it chose the first time. It’s almost as if the build configuration is cached and can only be force updated by making a change to either the model file or the macro.
Is this a bug? If not, is there a way around it? And more generally – is this even a good approach? I’m still fairly new to dbt so any suggestions are appreciated Thanks!