How to prevent partial read in dbt

The problem I’m having

How can I prevent partial or inconsistent reads in dbt when transforming data from a source system that only supports READ COMMITTED isolation level?

The context of why I’m trying to do this

I am implementing a CDC (Change Data Capture) pipeline and using dbt to transform the streaming data. Below is the current setup:

  • CDC Source: MySQL
  • Connector: Debezium → Kafka
  • Ingestion into Warehouse: SingleStore Pipeline loads Kafka CDC messages into a staging table
  • Transformation: dbt incremental model to apply CDC logic (insert/update/delete) into a target table
  • Warehouse: SingleStore (a distributed OLTP + OLAP database)

SingleStore currently provides only READ COMMITTED isolation level.
This means:

  • dbt may read data that is mid-ingestion
  • No guarantee of a consistent snapshot during a dbt run
  • Potential for partial or inconsistent transformation results

Some example code or error messages

{{ config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='delete+insert',
    alias='target_tbl'
) }}


with src as (
    select
        case 
            when payload::$after <> 'null' then payload::after::`id`
            else json_extract_bigint(payload, 'before','id')
        end as id,
        
        case 
            when payload::after <> 'null' then payload::after::`col_num`
            else json_extract_bigint(payload, 'before','col_num')
        end as col_num,
        
        case 
            when payload::after <> 'null' then payload::after::$op
            else json_extract_string(payload, 'before','op')
        end as op,

        case when payload::$op = 'd' then true else false end as is_deleted,
        
        payload::$op as operation,

        payload::source::`ts_ms` as source_ts_ms
        
    from stage_tbl
    where payload::$op in ('c', 'u', 'd')
    
    {% if is_incremental() %}
      and payload::source::ts_ms > (
          select coalesce(max(source_ts_ms), 0)
          from {{ this }}
      )
    {% endif %}    
),

latest as (
    select
        *,
        row_number() over (partition by id order by source_ts_ms desc) as rn
    from src
)

select
    id,
    col_num,
    op,
    is_deleted,
    source_ts_ms
from latest
where rn = 1

I would like guidance on dbt best practices to safely run transformations on streaming/CDC data
Thank you