The problem I’m having
How can I prevent partial or inconsistent reads in dbt when transforming data from a source system that only supports READ COMMITTED isolation level?
The context of why I’m trying to do this
I am implementing a CDC (Change Data Capture) pipeline and using dbt to transform the streaming data. Below is the current setup:
- CDC Source: MySQL
- Connector: Debezium → Kafka
- Ingestion into Warehouse: SingleStore Pipeline loads Kafka CDC messages into a staging table
- Transformation: dbt incremental model to apply CDC logic (insert/update/delete) into a target table
- Warehouse: SingleStore (a distributed OLTP + OLAP database)
SingleStore currently provides only READ COMMITTED isolation level.
This means:
- dbt may read data that is mid-ingestion
- No guarantee of a consistent snapshot during a dbt run
- Potential for partial or inconsistent transformation results
Some example code or error messages
{{ config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert',
alias='target_tbl'
) }}
with src as (
select
case
when payload::$after <> 'null' then payload::after::`id`
else json_extract_bigint(payload, 'before','id')
end as id,
case
when payload::after <> 'null' then payload::after::`col_num`
else json_extract_bigint(payload, 'before','col_num')
end as col_num,
case
when payload::after <> 'null' then payload::after::$op
else json_extract_string(payload, 'before','op')
end as op,
case when payload::$op = 'd' then true else false end as is_deleted,
payload::$op as operation,
payload::source::`ts_ms` as source_ts_ms
from stage_tbl
where payload::$op in ('c', 'u', 'd')
{% if is_incremental() %}
and payload::source::ts_ms > (
select coalesce(max(source_ts_ms), 0)
from {{ this }}
)
{% endif %}
),
latest as (
select
*,
row_number() over (partition by id order by source_ts_ms desc) as rn
from src
)
select
id,
col_num,
op,
is_deleted,
source_ts_ms
from latest
where rn = 1
I would like guidance on dbt best practices to safely run transformations on streaming/CDC data
Thank you