Am I using incremental table correctly?

Hello! I’m reaching out because I’m unsure if I am understanding how to use incremental models correctly. Here is what I am trying to accomplish:

We have an employees table in Google BQ. This table is set up to have a new row added whenever an there is a change to an employee’s record. Essentially, the table is a historical record of employee data. The uniqueness of the rows in the table are a combination of an employee’s id and the timestamp of when the change was captured. The table is also partitioned by day based on the timestamp field.

We’re planning to create an incremental model using this employees table as the source to get the current state of our HR data. For the model, we’re using the using the “merge” strategy and the employee’s id would be unique.

Currently, I’m running into two issues:

  1. When the “build” or “run” commands worked, I would get duplicate employee ids in our model table
  2. I’m not certain what I did, but now every time I use the “run” or “build” commands, I get an error that states:
19:11:34 On model.norcal_analytics.stg__employees: /* {"app": "dbt", "dbt_version": "1.7.12", "profile_name": "user", "target_name": "default", "node_id": "model.norcal_analytics.stg__employees"} */
-- back compat for old kwarg name

My questions are:

  1. Am I completely off base on how to use an incremental model? Like, is this just not a good use case/what incremental models are intended for?
  2. If this is a good use case for an incremental table, then does anyone have ideas on why I’m getting duplicate ids and why I’m getting this error? I’m thinking the error might be getting raised by the is_incremental macro, but I’m not 100% sure on this.

I’ve read a lot of docs on incremental tables, but I’ve mostly been referencing to this one. Here’s my model file:

{{ config(
    materialized='incremental',
    unique_key='account_id',
    partition_by = {
      "field": "last_updated_utc",
      "data_type": "timestamp",
      "granularity": "day"
    }
) }}

with source as (
    select * from {{ source('hris','base__employees') }}
)

, final as (
    select
        id as account_id
        , employee_id as local_staff_id
        , first_name as staff_first_name
        , last_name as staff_last_name
        , middle_name as staff_mid_initial
        , nickname as nickname
        , primary_email as staff_email
        , secondary_email as secondary_email
        , status as staff_status
        , address_line_1 as street_address
        , address_line_2 as apartment
        , city as city
        , state as state
        , zip as zip_code
        , country as country
        , primary_phone as primary_phone
        , hired_date as hired_date
        , re_hired_date as re_hired_date
        , started_date as started_date
        , terminated_date as terminated_date
        , birth_date as birth_date
        , location_cost_center_id as location_cost_center_id
        , department_cost_center_id as department_cost_center_id
        , supervisor_account_id as supervisor_account_id
        , last_updated_utc as last_updated_utc
    from source
)

select * from final

{% if is_incremental() %}
    where last_updated_utc >= (select max(last_updated_utc) from {{ this }})
{% endif %}

As I’m writing this, I’m wondering about the placement of the is_incremental macro. Should I place this somewhere else like within the “final” cte?

Thanks!!

Is there a reason not to use a snapshot table instead?

Note: @andrewhharrison originally posted this reply in Slack. It might not have transferred perfectly.

2 Likes

I hadn’t heard of snapshots before. I haven’t fully looked into them, but from skimming this article, it might be a good alternative. Thanks!

1 Like

Hi Mark,
I am facing the exact same issue with incremental loads of my table. What you are doing is correct. I even looked at a few videos on YT and they prescribe the same thing. In my case, I load my source table with new raw data but when I run my transformation model with the incremental() materialization macro in it, the model does not compile! I even tried replacing the {{ this }} with {{ source(table name }}). This time the model compiles, but the table is not getting loaded. I am stuck at this point and looking for a breakthrough.

I figured it out. I had to add a new datetimestamp column to my target table and add logic in the {{ if incremental }} macro to ensure I am pulling rows from my source table that are fresher than the max timestamp in my target table. Now every time I load the target table, it just loads just the fresh rows from the source. The new timestamp column of the target table shows the different timestamps of load. Works like a charm!

Hi Mark,

The incremental materialization method of dbt core will :

  1. make sure the extra where clause condition is added to the model query when the run is also on incremental mode (when a target table exists and no full-refresh option has been provided)

  2. keep the previous materialized data and merge the output of the model query into this table

The data transformation from the transactional format to the current state data structure is entirely up to the model query. The current model query is not performing this transformation yet, and neither will the dbt core.

Imagine a day has passed since the previous model run. What if a single account identified by account_id was subject to multiple update operations in the meantime? Which process ensures the last update is the last one applied to the current state target? What if the source deletes an account, and we intend to keep the current state accurate? What if an account_id is deleted and reinserted again in between 2 subsequent model runs? What if an account was created by error and deleted again on the same day? There are many reasons why the transition from a historical data format requires a model SQL to do the aggregation to the current state. Even if a single account can only get one update a day in the source per dbt run interval… what if, for some reason, the run of the job is skipped, and the next run has to process 2 partitions/days in one shot? If the source has no delete scenarios, I would add SQL code to ensure each account’s last version is fetched. When delete scenarios exist, it will depend on how the source provides transactional data on deleted accounts.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.