2 step processing of streaming data with dbt incrementals

Hello dbt Community,

I want process streaming data with dbt incremental models in two steps. The data comes from a Kafka Topic and is loaded into a Snowflake Database (via Snowpipe Streaming). The table (R1) holds the raw streaming data in two columns: metadata and content_data.

My approach:
R1 → P1 → P2
• Step 1: I create a SCD-Type-2 Table (P1) which holds all relevant columns extracted data from R1. Primary Key: METADATA_KEY + METADATA_OFFSET
• Step 2: I create a SCD-Type-1 Table (P2) out of P1 by which only hold the currently valid data. Primary Key: METADATA_KEY

Questions:
• Is there a more efficient way for the process, e.g. by avoiding full table scans or using another filter key instead of the metadata_create_ts? Is there a query optimization in the background or is the query optimation of snowflake used anyways?
• When would I use “delete+insert” for the second step instead of “merge”?

— Table P1 —

{{ config(
materialized = ‘incremental’,
incremental_strategy = ‘append’,
) }}

SELECT

– META_DATA
CAST(RECORD_METADATA:CreateTime::VARCHAR AS TIMESTAMP_LTZ) AS METADATA_CREATE_TS,
CAST(RECORD_METADATA:SnowflakeConnectorPushTime::VARCHAR AS TIMESTAMP_LTZ) AS METADATA_SF_CON_PUSH_TS,
RECORD_METADATA:key::VARCHAR AS METADATA_KEY,
RECORD_METADATA:offset::NUMBER AS METADATA_OFFSET,
RECORD_METADATA:partition::NUMBER AS METADATA_PARTITION,
RECORD_METADATA:topic::VARCHAR AS METADATA_TOPIC,

–CONTENT_DATA

FROM {{ source(‘KAFKA_STREAM’, ‘R1’) }}

{% if is_incremental() %}

where METADATA_CREATE_TS >= (select max(METADATA_CREATE_TS) from {{ this }})

{% endif %}

— Table P2 —

{{ config(
materialized = ‘incremental’,
incremental_strategy = ‘merge’
) }}

WITH latest_stream_entries AS (
SELECT
*,
RANK() OVER (PARTITION BY METADATA_KEY ORDER BY METADATA_OFFSET DESC) as rn
FROM {{ ref(‘P1’) }}
{% if is_incremental() %}
where METADATA_CREATE_TS >= (select max(METADATA_CREATE_TS) from {{ this }})
{% endif %}
)

SELECT
* EXCLUDE (rn)
FROM
latest_stream_entries
WHERE
rn = 1

Best regards,

mate

Yes, just create 2 dynamic tables in snowflake and forget adding technical debt that is dbt:

  • dyn_scd2 - just flattens R
  • dyn_scd1 - select current from dyn_scd2 using window function row number order by etl_load desc