How to create near real-time models with just dbt + SQL

Preface

Before I dive into how to create this, I have to say this. You probably don’t need this. I, along with my other Fishtown colleagues, have spent countless hours working with clients that ask for near-real-time streaming data. However, when we start digging into the project, it is often realized that the use case is not there. There are a variety of reasons why near real-time streaming is not a good fit. Two key ones are:

  1. The source data isn’t updating frequently enough.
  2. End users aren’t looking at the data often enough.

So when presented with a near-real-time modeling request, I (and you as well!) have to be cynical.

The right use case

Recently I was working on a JetBlue project and was presented with a legitimate use case: operational data. JetBlue’s Crewmembers need to make real-time decisions on when to close the door to a flight or rebook a flight. If you have ever been to an airport when there is a flight delay, you know how high the tension is in the room for airline employees to make the right decisions. They literally cannot do their jobs without real-time data.

If possible, the best thing to do is to query data as close to the source as possible. You don’t want to hit your production database unless you want to frighten and likely anger your DBA. Instead, the preferred approach is to replicate the source data to your analytics warehouse, which would provide a suitable environment for analytic queries. In JetBlue’s case, the data arrives in JSON blobs, which then need to be unnested, transformed, and joined before the data becomes useful for analysis. There was no way to just query from the source to get the information people required.

Tldr: If you need transformed, operational data to make in-the-moment decisions then you probably need real-time data.

What are our options?

1. Materialize everything as views

Since views are simply stored queries that do not store data, they are always up to date. This approach works until your transformations take more than 2+ minutes, which wouldn’t meet a “near real-time” SLA. When your data is small enough, this is the preferred approach, however it isn’t scalable.

2. Run dbt in micro-batches

Just don’t do it. Because dbt is primarily designed for batch-based data processing, you should not schedule your dbt jobs to run continuously. This can open the door to unforeseeable bugs.

3. Use Materialized Views

While the concept is very exciting, implementation still has some key limitations. They are expensive and cannot include some useful transformations. Check out this section in Snowflake’s documentation about their current limitations. Additionally, we have a office hours on this topic that I recommend checking out. Overall, we are excited to see how this develops.

4. Use a dedicated streaming stack

Tools like Materialize and Spark are great solutions to this problem. At the time of writing this post, these dbt adapters were not available and would have required a commitment to a new platform in this situation so they were not considered.

5. Use dbt in a clever way

By being thoughtful about how we define models, we can use dbt and the existing materializations to solve this problem. Lambda views are a simple and readily available solution that is tool agnostic and SQL based. This is what I implemented at JetBlue.

What are lambda views?

The idea of lambda views comes from lambda architecture. This Wikipedia page can explain much more in-depth but the core concept of this architecture is to take advantage of both batch and stream processing methods. This enables handling a lot of data in a very performant manner.

Taking that approach, a lambda view is essentially the union of a historical table and a current view. The model where the union takes place is the lambda view. Here is a diagram that might be helpful:

The lamba view can be queried for always up-to-date data, no matter how often you have run your dbt models. Since the majority of the records in the lambda view come directly from a table, it should be relatively fast to query. And since the most recent rows come from the view, transformations are run on a small subset of data which shouldn’t take long in the lambda view. This provides a performant and always up-to-date model.

The SQL in the lambda view is simple (just a union all), but there’s a bit of work to get to the unioned model. To better understand this, it is important to think about the flow of transformations as demonstrated below.

What is essentially happening is a series of parallel transformations from the raw data. Looking at the blue and red boxes that represent the creation of both the current view and historical table, you can see that they are the same transformations. The only key difference is that one flow is always using views as the materialization versus the other is materialized as tables. Often, those tables are incrementally built to improve performance.

The most basic version of the SQL looks like this:


with current_view as (

select * from {{ ref('current_view') }}

where max_collector_tstamp >= ‘{{ run_started_at }}’

),

historical_table as (

select * from {{ ref('historical_table') }}

where max_collector_tstamp < '{{ run_started_at }}'

),

unioned_tables as (

select * from current_view

union all

select * from historical_table

)

select * from unioned_tables

As you start to implement lambda views with more sources, creating a macro for the lambda view union is a great way to make things drier.

Key Concepts of a lambda view

Filters are key to making this performant

You need to filter often and intentionally on your current view flow. This is because there is usually a lot of data being transformed and you want to transform only what is necessary. There are two main places that filters should be considered.

  • At the beginning of the current view flow: This is usually happening at Transformation 1. This filter takes into account how often the historical table is run. If it’s being run every hour, then I filter for only the last 2 hours of “current rows”. The overlap is assurance that if there are any issues with the job run, we don’t miss out on rows.
  • At the unioned model: If you have late-arriving facts, you will want to include a primary key filter to assure that there are no fanouts.

As you start to create more lambda views, you will want to make the filter into a macro for drier code. Here is a sample macro for you to use:


{% macro lambda_filter(column_name) %}

{% set materialized = config.require('materialized') %}

{% set filter_time = var(lambda_timestamp, run_started_at) %}

{% if materialized == 'view' %}

where {{ column_name }} >= '{{ filter_time }}'

{% elif is_incremental() %}

where {{ column_name }} >= (select max({{ column_name }}) from {{ this }})

and {{ column_name }} < '{{ filter_time }}'

{% else %}

where {{ column_name }} < '{{ filter_time }}'

{% endif %}

{% endmacro %}

** Note for the macro above, the timestamp is var(lambda_timestamp, run_started_at). We want to default to the last time the historical models were run but allow for flexibility depending on the situation. It would be useful to note that we used run_started_at timestamp rather than current_timestamp() to avoid any situations where there is a job failure and the historical table hasn’t been updated for the last 5 hours.

Write idempotent models

As with every dbt model, be sure to keep this principle in mind as you start to create the models leading up to the union as well as the union itself.

Build the historical model intentionally

My dbt cloud jobs are set to run every hour, building all of the models in the historical table flow. All of the models leading up to the historical table are configured as an incremental model to improve performance.

Tradeoffs & Limitations

1. Duplicated logic

We are essentially creating duplicate models with the same logic, just different materializations. There are currently two approaches: 1) You can write the SQL in a macro and then have only one place to update this logic. This is great but creates complex folder organization and lowers model readability. 2) You can duplicate the SQL in both models but then there’s two places to update the logic in. This makes maintenance more error-prone.

2. Complex DAGs & Multi-step transformations

With every final model needing duplicate models, this makes DAGs significantly more complex. Add to that the need for more complex transformations, this approach may not scale up well due to the complexity.

3. Enforcing materializations

This approach is extremely dependent on the type of materializations. Requiring that models in the current view flow and their dependencies are all views can be challenging. Your project is more brittle because small changes can easily impact your data quality and how up-to-date it is.

Future implementations

All I can truly say is there’s more to come. Internally Jeremy and Claire have been working on if and how we should create a custom materialization to make this approach a lot cleaner. If interested, keep an eye out on our experimental features repo and contribute if you have thoughts!

Thank You

This post would not have materialized (lol) without the Fishtown Team working together.

Drew and I had a brainstorming session to discuss lambda architecture and the initial concept of lambda views. Sanjana and Jeremy were my rubber duckies as I started writing the SQL and conceptizing how things would work in code. Janessa and Claire also spent a lot of time helping me write this discourse post as I tried to form it between meetings. This is basically us at Fishtown.

I also want to give a special thanks to Ben from JetBlue who has been essential to this process from implementation to editing this post. My apologies for spelling JetBlue as Jetblue far too many times!

28 Likes

Great article, Amy, and thanks for your support in helping us implement lambda views, and I also wanted to acknowledge Chris Holliday from Visual BI for his insights as well! I’m Ben from JetBlue :wave:. And no need to apologies about the spelling! Our logo reads “jetBlue” but in print it’s “JetBlue”. :man_shrugging:

I thought it might be helpful to share how we get our near real-time data into Snowflake. Amy touched on some use cases, which are anchored around real-time operational decision-making. To meet these needs, our internal customers have set a target SLA of < 3 minutes from event creation in the source system to appearing in a user-facing dashboard.

For a number of our data sources (e.g. flights, bookings, check-ins, etc.), our source systems generate real-time event messages. Those messages undergo some streaming transformations, so they can be placed back on messaging queues for a variety of needs. The messages are also saved to various containers in Azure as JSON files and are organized using logical paths that include the date and time, specifically in the format YYYY/MM/DD/HH24/MI/filename.json. A (hypothetical) file in blob could look like something like this: https://jetblue.blob.core.windows.net/flights-container/2020/07/23/08/46/flight123.json. I should note that the volume of messages for each data source is large.

Following dbt best practices, I need to get this JSON data into tables in our RAW database in Snowflake, so that I can quickly expose it in ANALYTICS using lambda views. I considered two approaches for ingesting data into RAW and took the approach that may surprise you, but I think for good reason (always welcome feedback if you have better ideas!):

  1. Load RAW tables via Snowpipe. I was initially biased towards this approach, but unfortunately due to cost, this wasn’t a viable option. Per the docs, the cost of Snowpipe is 0.06 credits per 1000 files plus a hard-to-estimate cost for CPU-processing time. As I mentioned above, some of my real-time data sources do several thousand files per minute, so the cost becomes astronomical very quickly. (Quick math, keeping it simple: 1,000 messages per minute = .06 credits used per minute x 1440 minutes/day x 365 days/year = 31,536
    credits/year + associated CPU costs. Multiply that by a number of real-time data sources and = very, very expensive). Remember, also, that the whole purpose of this is to meet our SLA of < 3 minutes, and the latency of the load using Snowpipe also isn’t guaranteed. In my own experiments, loads have ranged from a few seconds to close to a minute for a single message. Long story short, this isn’t the best approach for my use case. I think using Snowpipe makes most sense to ingest data that comes at unpredictable times and in relatively low volumes. My real-time feeds are fairly constant and high volume.

  2. Use a Task set to run every minute that copies data from blob into RAW. I’ll go into more detail below, but the general idea is to run a COPY INTO command that copies data from an External Stage where the JSON messages are stored into RAW. If this task runs quickly and frequently, we can meet our < 3 minute SLA.

The Details

I need to provide some context for how the JSON files are loaded from blob into RAW because I was faced with another decision point that I want to share. So you’re aware, each container is referenced in its own Snowflake External Stage (e.g., azure://jetblue.blob.core.windows.net/flights-container). There is a flight stage, bookings stage, etc. (this is a bit of an over simplification but you get the gist). The goal is to store JSON messages of the same type in a table in a VARIANT type column, and I also want to store the filename and time the row was inserted as metadata columns for each file, using metadata$filename and current_timestamp, respectively.

Let’s walk through a theoretical example to set the context –

If the current time is 2020-07-23 08:47 and we’ve loaded all the files since 2020-07-23 08:45, then we need to ingest the files in the blobs: azure://jetblue.blob.core.windows.net/flights-container/2020/07/23/08/46/* and azure://jetblue.blob.core.windows.net/flights-container/2020/07/23/08/47/*. If we run a task every minute, then at 2020-07-23 08:48, we need to ingest the files in blobs: azure://jetblue.blob.core.windows.net/flights-container/2020/07/23/08/47/* (there may have been more files added to this blob since the previous ingest started at some point during 2020-07-23 08:47) and azure://jetblue.blob.core.windows.net/flights-container/2020/07/23/08/48/*. In other words, every time we want to ingest data, we only need to copy from the blob paths between the latest file we ingested and the current time.

I want emphasize that the folder structure in blob is very important because simply running a COPY INTO command on an external stage that points to a container with millions of files will be very slow. There is no chance it will meet a < 3 minute SLA. We need to be more specific with the path of our external stage when ingesting data.

During one of our regular data engineering / dbt discussions, @josh recommended I take a closer look at Partitioned External Tables to use as our RAW tables. When created correctly, these tables can be quite performant since they are smart-enough to only scan the paths containing the data needed. I wanted this to work but again, I ran into two challenges. First, external stages need to have their metadata refreshed to become aware of new files in the external stage location. This can be done automatically if you use Event Grid (in the case of Azure) or you can manually trigger a metadata refresh (alter external table my_table refresh;). The auto-refresh approach is, as you’d expect, expensive. In fact, it carries the same pricing as Snowpipe, so that won’t work. Manually refreshing could theoretically work, but it was starting to take 1.5 - 2 minutes to refresh before I soon encountered a new error.

Snowflake runs into issues creating external tables from external stages that have a large number of individual files. You will get an error like: Total size (>=1,073,742,048 bytes) for the list of file descriptors returned from the stage exceeded limit (1,073,741,824 bytes); Number of file descriptors returned is >=3,728,271. Please use a prefix in the stage location or pattern option to reduce the number of files. Some of my containers have many multiples of the number of “file descriptors” that caused this error, so this approach won’t work for me unfortunately. I had to abandon using external tables.

The actual solution

To read the JSON from blob into our RAW database in Snowflake in near real-time, we run a task set to a frequency of 1 minute for each real-time data source. The tasks call a stored procedure that more or less follows the steps below:

  1. Get the most recent JSON filename inserted into the table of interest. (The source tables always have the metadata$filename for each JSON file stored in a column called metadata_filename.)
-- step 1
select
    max(metadata_filename) as metadata_filename
from my_table
  1. Extract the timestamp of the filename folder partition using a regular expression.
-- step 2
select
    regexp_substr(
        metadata_filename,
        '\\\\d{4}/\\\\d{2}/\\\\d{2}/\\\\d{2}/\\\\d{2}'
    ) as last_partition_copied
from step_1
  1. Calculate the number of minutes since the last folder partition was loaded. We will need to load all the data from the last partition again (duplicates wont be copied because COPY INTO doesn’t copy duplicates, thankfully) in case we missed any files when the last task ran.
--  step 3
select
    timestamp_from_parts(
        left(last_partition_copied, 4),
        substr(last_partition_copied, 6, 2),
        substr(last_partition_copied, 9, 2),
        substr(last_partition_copied, 12, 2),
        right(last_partition_copied, 2),
        0
    ) as last_partition_copied_timestamp,
    timestampdiff(minute, last_partition_copied_timestamp, current_timestamp) as minutes_since_last_partition_copied
from step_2
  1. Write a loop (in javascript) that will execute a COPY INTO statement for each minute between last_partition_copied_timestamp and the current timestamp by specifying the path on the external stage. You’ll need to dynamically create the path to add to the end of the stage.
-- to illustrate that the folder structure needs to be dynamically generated using Javascript
copy into my_table from (
    select
        $s.1
        metadata$filename,
        current_timestamp
    from @my_stage/${year}/${month}/${day}/${hour}/${minute} s
)

Following this approach, I’m able to ingest most files from blob in < 1 minute, which means that our lambda views expose this data at the same speed. When it works for the first time, you can’t stop querying the lambda views to see new records pop in!

Hope this was helpful for someone in the community. I’d welcome any feedback or other suggestions!

12 Likes

Aw yeah @amy— thanks for taking the time to write this up!

As Amy noted, we’ve been playing around with a custom materialization to make the implementation of this easier. dbt, in its current form, assumes that one model will produce only one relation (table/view) in a warehouse, and we suspect that if we try to use one materialization to create both a view and a table, some weird things might end up breaking!

But we’re going to try it out at some point just to see what happens :wink:

Hi @amy

thank you for sharing this. We are actually implementing something very similar to the lambda-view that you described, but only bespoke for a few core models.

I am recently thinking about how to build a generalized approach to enable lambda-view on any given model.

Thank you @claire to share the lambda-view in dbt-lab-experiment.

Our use cases are very similar to yours, but with some additional challenges.

One challenges for us is that some our model queries are super complex, so we can not just define a bunch of views and chain them together, the query would be over-complexity. That’s why we need the lambda-view of some models to be a temp table to break down the query complexity.

The other challenge is how to filter down the near-current-time source input to as minimal as possible. For example, we have location-time-series measurements. We only fetch a few locations in each lambda-view call. many our model logic is too complex to pass the filter predicate down through view. So we need to define a cte-macro or a stored_procedure for that.

I am doing some experiments with customized materialization that could enable a lambda view for any given incremental model, The lambda-view would be created as a macro or store_procedure, which when called, will return a cte-sub-query or a temporary table.

BTW, we use BigQuery. hope to chat more.

3 Likes

immense wisdom, here. :heart: :heart: :heart:

2 Likes

Could you speak to how the above might work if the schema of tables changes fairly often (Say, 3 times per month) … Any thoughts on making this more dynamic?

1 Like

Hey @amy ! I wanted to say thanks so much for writing this up, and a special thanks to the comment from @ben.s for some extra implementation context! This pattern exactly fit a use-case at my organization and this write-up served as a tremendous blueprint for us to hit the ground running.

I wanted to add a little something we ran into while implementing this architecture which is not mentioned above, namely a dependency timing gap between the near-real-time (NRT) and historical (HIST) flows.

When dbt runs, it ranks and executes tasks in dependency order. Since the NRT models are all views which depend on only other views, the tasks to update those models are very quick and dbt updates those views almost immediately. The HIST models are all incremental or table loads, which take relatively more time to finish updating. In our case it could take 15 minutes or more for these loads to complete.

During that time, the NRT models would be updated to filter on THIS run’s run_started_at time, filtering out records which now need to be loaded into the HIST flow tables. Those same records wouldn’t be fully loaded in the HIST models for another 15 minutes. As a result, when querying the Union model during dbt run, our return dataset would not include them at all.

Our fix to this was actually very simple, for every NRT model which had a HIST counterpart, we could add the following comment to the view definition -- Manual Dependency on {{ ref('historical_model_name') }} this way dbt recognizes the historical load as a dependency and waits until the historical load is complete before updating the NRT view definition.

@amy and @ben.s Thank you both for sharing this writeup and use case! Quick thought and question…

  1. Would it work to build one view with the transformations (business logic) sans filters, and then two dependent models one which begins the NRT path, and the second which begins the HIST path? That way we’d be native dbt/SQL for the complex model and could apply the filters needed in the respective, forked downstream models?

  2. Would there be a major performance hit to building the NRT view filter as a subquery - to fetch the latest timestamp from HIST and/or look for PK’s not in the HIST table? It’s nice to have the timestamps hardcoded in a sense, but also assumes the NRT and HIST models are updated at the exact same time (to avoid dupes) and feels a little less “portable”.

Again, thanks for the great post!