Snowflake Streams with DBT

Hi,

We are in the process of assessing our orchestration tool for modelling within in Snowflake.

I have used Snowflake and dbt in the past together but couldn’t find a way to leverage the two and their strengths together.

Has anyone been able to use Snowflake Streams within dbt models?

I prefer using streams for incremental modelling couple with the version control / DAG capability of dbt.

Would be great to hear if this has been solved in any way.

we have been using snowflake streams to process delta in incremental models
we defined streams as sources in dbt and used them in incremental models

version: 2

sources:
  - name: raw_zone
    database:  database
    schema: raw
    tables:
      - name: table1
      - name: table1_stream

incremental_model.sql

select * from 

{% if is_incremental() %}
 {{ source('raw_zone' , 'table1_stream') }}
{% else %}
{{ source('raw_zone' , 'table1') }} 
{% endif %}

Can be done but it requires solving much more than just sourcing from streams instead of a table:

  1. Models most likely source from multiple tables. All tables used as a source influence the outcome of the target table constructed by the model. An incremental data process only considering changes from one of source tables will deliver results different from doing full runs. If the intention is to incrementally maintain a target table of a model and keep the result the same as doing full runs, changes from all sources used will have to be taken into account.

  2. This brings the subject to the need to consolidate change from all different sources used by a model towards a single suitable key column (or combined key), and next visit each source to capture the data related to the consolidated key set. This approach is required with or without using streams. It is required as soon as one wants to restrict the volume of input data for incremental processing. It is complex, but it can be done and savings are still huge compared to full processing. It is a problem not specific to using streams as a source. It is typical for all methods that limit the input’s scope for incremental processing. The data required from each source is a bit bigger than the scope of data which can come out of the individual streams. Even a single source model can be challenging to capture change adequately. E.g. when using a windowing function in the SQL with a partition clause with a little bigger data scope. Doing incremental source and incremental data processing almost always requires a first step to consolidate a set of keys subject to change, and next fetching the data of all sources related to these keys, instead of just capturing streams or creation/modification/delete filtered data as an input.

  3. Care should be taken to use DDL (as DDL implicitly commits the snowflake transaction) in the end to end process for a given model. Standard materialisations of dbt use DDL as well. A Snowflake stream is consumed by a commit following DML using the stream. It is dangerous to risk a stream to be consumed before all end-to-end actions are successfully done. It would jeopardize target integrity, and the ability to restart without full reprocessing after a failure. When not taking care of this item, the consequence is the need to fully reprocess all data after each job failure. Therefore, all DDL created by the dbt macros should be moved to a spot before actions reading from the stream in a DML SQL statement. For sure never use DDL to read from the stream as the consumption will be committed at the completion of this DDL, not at the time of the final commit at the end of the job. Also, all procedures or hooks allowed in the de model should be free of a (implicit) commit. This is challenging, but it can be done

In case any of the sources is subject to physical deletes, working with streams can actually solve a lot of problems of incremental processing integrity.

I feel the use of streams is beneficial if creation/modification/soft delete operations on the source cannot be trusted 100% (never saw data worth this 100% trust); when physical deletes are being done (also implicit by partial reloads of source data during schema evolution operations for instance, or data retention operations, GDPR related stuff); and of course to save money on avoiding larger data scans looking for a change. Coding of incremental data processing without streams is hard to generalise, to automate. It requires custom development for each model individually, and a big test workload issue for each of these models. Snowflake’s streams feature is definitely a game-changer.

It is fun and rewarding to ‘macro’ this in dbt materializations. The most significant return is to enable less technical (from the data engineering point of view) people to build incremental data true processing, without requiring the detailed knowledge, nor the testing burden to release a stable incremental data process. It brings us a big step closer to being able to industrialize data processing in a cost-efficient way starting from a non-incremental SQL code definition. Huge step closer to increasing the processing frequency, moving closer to real-time batch processing.

1 Like

Yes, we are using streams and change tracking in some of our incremental models. Here is a breakdown of how to approach each “cardinality” of dependencies between source models and target models:

  • 1:1 relationship: Happy path! Point the target model to a stream atop the source.
  • M:1 relationship: Define a view that does the merging/joining, then treat it as a 1:1 atop the view.
  • 1:M relationship: This is M x (1:1), so apply 1:1 to each target model.
  • M:M relationship: This is M x (M:1), so apply M:1 to each target model.

If you are wondering if there are any pre-build packages or macros that you can use to manage Snowflake streams in dbt, I am only aware of this package that creates a custom materialization strategy that leverages streams: dbt - Package hub

Hi the dbt-package hub link seems to be broken, could you please re-point the right link, thx

I don’t seem to have the permission to edit my post (or I can’t find the button).

What I can do, however, is share the link as a new post :slight_smile:

https://hub.getdbt.com/arnoN7/incr_stream/latest

Should it break again, the package name is incr_stream and the author is arnoN7.

1 Like

Hi,
I’m the Author of the dbt-stream (GitHub - arnoN7/dbt-incremental-stream ) package. I updated the doc 3days ago. The doc was not matching the code :frowning: .
Normally the last release 1.1.0 it works fine.
You can’t contact me on slack (Slack) if you face issues or post an issue on GitHub

Somewhat easy hack is to use if incremental with metdataaction like below if u don’t care about deletes

{% if is_incremental() %}
stream where METADATA$ACTION = ‘INSERT’
{% else %}
source table
{% endif %}
u can’t us METADATA$ISUPDATE but I think this should work as well.

Or is there any better way to do it?