I am trying to implement some incremental models to decrease BQ processing cost of some of our more expensive models. I am finding that the incremental versions are running faster, but actually processing more. What am I missing here? Worth noting these models consist of many CTEs. The source tables are partitioned. I am using a bit of code that I know works in other models and processes significantly less than just building the table. The only difference is that in those models the source table is also clustered.
{{
config(
materialized='incremental',
unique_key='id',
partition_by={
"field": "_last_updated",
"data_type": "timestamp",
"granularity": "day"
},
require_partition_filter = false
)
}}
{% set maxDays = var('maxDays') %}
{% set timestampField = '_last_updated' %}
{% set latestTimestamp = calc_latest_timestamp(maxDays, timestampField) %}
with stg_table as (
select
*
from {{ref('stg_table')}}
{% if is_incremental() %}
where _last_updated > "{{latestTimestamp}}"
{% endif %}
),
##an example of the run
(venv) dbt_test@Jesss-MBP-2 scoir-dbt % DBT_SDLC_LEVEL=prod dbt run --models dv_incremental_test dv_table
13:19:15 Running with dbt=1.5.3
13:19:15 Registered adapter: bigquery=1.5.3
13:19:16 Found 610 models, 1136 tests, 0 snapshots, 0 analyses, 576 macros, 0 operations, 0 seed files, 80 sources, 0 exposures, 0 metrics, 0 groups
13:19:16
13:19:17 Concurrency: 1 threads (target='dev')
13:19:17
13:19:17 1 of 2 START sql table model dbt_test.dv_table ................. [RUN]
13:20:05 1 of 2 OK created sql table model dbt_test.dv_table ............ [CREATE TABLE (21.5m rows, 6.3 GiB processed) in 47.93s]
13:20:05 2 of 2 START sql incremental model dbt_test.dv_incremental_test [RUN]
13:20:48 2 of 2 OK created sql incremental model dbt_test.dv_incremental_test [MERGE (36.7k rows, 14.4 GiB processed) in 43.14s]
13:20:48
13:20:48 Finished running 1 table model, 1 incremental model in 0 hours 1 minutes and 32.23 seconds (92.23s).
13:20:49
13:20:49 Completed successfully
13:20:49
13:20:49 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
A couple of suggestions I use:
• To save costs you need to filter both the source data (as you do) and the target table (if possible). A merge with a (unique_key config) needs to scan the entire target table to look for matches. There a 2 ways to help with this:
- Cluster your target table on the unique_key column i.e. <https://discourse.getdbt.com/t/benchmarking-incremental-strategies-on-bigquery/981|clustered merges> to enable block pruning
- Use the <Incremental models | dbt Developer Hub predicates> config to filter your target table so that not all of its history needs to be scanned (if possible)
Note: @Johann De Wet
originally posted this reply in Slack. It might not have transferred perfectly.
1 Like
I see the hyperlinks does not display too well on discourse fro slack. These are the concepts I refer to:
• Clustered merges - https://discourse.getdbt.com/t/benchmarking-incremental-strategies-on-bigquery/981|clustered
• Incremental predicates - https://docs.getdbt.com/docs/build/incremental-models#about-incremental_predicates
Note: @Johann De Wet
originally posted this reply in Slack. It might not have transferred perfectly.
Thanks for the response! I found the incremental predicate lowers the cost a ton, but I think that limiting the scan of the target table leads to some records not getting properly updated. For example - I added the following:
incremental_predicates = [
"DBT_INTERNAL_DEST._last_updated > timestamp_ADD(current_timestamp(), interval -4 hour)"
],
If I am understanding correctly, this will only scan the target table for the past 4 hours of _last_updated
but if there are records older than that the updates won’t be merged? Am I understanding this properly?
Yes, that is absolutely correct. We tend to use a big enough filter to ensure we get 99.9% of the changes.
In our case the models are meant for analytics (as opposed to operational requirements) so we can live with this level of accuracy given the cost we save in processing. For you it might be different and require 100% accuracy in which case the incremental predicate may not be an option
Note: @Johann De Wet
originally posted this reply in Slack. It might not have transferred perfectly.
In fact, to be a more specific, in your example if there are records older than 4 hours the merge will see updates of them as “new” values and add them resulting in a duplicate.
If you know the data will not change after a certain amount of time that’s ideal to use in the filter. Even a filter over a large timespan (e.g. months/years) can still save A LOT of processing over having to scan the entire table which will just lead to ever decreasing performance and increasing costs i.e. not scalable
Note: @Johann De Wet
originally posted this reply in Slack. It might not have transferred perfectly.