Hello dbt Community,
I want process streaming data with dbt incremental models in two steps. The data comes from a Kafka Topic and is loaded into a Snowflake Database (via Snowpipe Streaming). The table (R1) holds the raw streaming data in two columns: metadata and content_data.
My approach:
R1 → P1 → P2
•	Step 1: I create a SCD-Type-2 Table (P1) which holds all relevant columns extracted data from R1. Primary Key: METADATA_KEY + METADATA_OFFSET
•	Step 2:  I create a SCD-Type-1 Table (P2) out of P1 by which only hold the currently valid data. Primary Key: METADATA_KEY
Questions:
•	Is there a more efficient way for the process, e.g. by avoiding full table scans or using another filter key instead of the metadata_create_ts? Is there a query optimization in the background or is the query optimation of snowflake used anyways?
•	When would I use “delete+insert” for the second step instead of “merge”?
— Table P1 —
{{ config(
materialized = ‘incremental’,
incremental_strategy = ‘append’,
) }}
SELECT
– META_DATA
CAST(RECORD_METADATA:CreateTime::VARCHAR AS TIMESTAMP_LTZ) AS METADATA_CREATE_TS,
CAST(RECORD_METADATA:SnowflakeConnectorPushTime::VARCHAR AS TIMESTAMP_LTZ) AS METADATA_SF_CON_PUSH_TS,
RECORD_METADATA:key::VARCHAR AS METADATA_KEY,
RECORD_METADATA:offset::NUMBER AS METADATA_OFFSET,
RECORD_METADATA:partition::NUMBER AS METADATA_PARTITION,
RECORD_METADATA:topic::VARCHAR AS METADATA_TOPIC,
–CONTENT_DATA
…
FROM {{ source(‘KAFKA_STREAM’, ‘R1’) }}
{% if is_incremental() %}
where METADATA_CREATE_TS >= (select max(METADATA_CREATE_TS) from {{ this }})
{% endif %}
— Table P2 —
{{ config(
materialized = ‘incremental’,
incremental_strategy = ‘merge’
) }}
WITH latest_stream_entries AS (
SELECT
*,
RANK() OVER (PARTITION BY METADATA_KEY ORDER BY METADATA_OFFSET DESC) as rn
FROM {{ ref(‘P1’) }}
{% if is_incremental() %}
where METADATA_CREATE_TS >= (select max(METADATA_CREATE_TS) from {{ this }})
{% endif %}
)
SELECT
* EXCLUDE (rn)
FROM
latest_stream_entries
WHERE
rn = 1
Best regards,
mate