How to create efficient timeseries models (or how to join efficiently on inequalities)


#1

Somewhat inspired by @drew’s great answer to this question about hash tables:

What we currently do

We need to report on the state of various records as of the end of every day, based on Type 4 history tables created by our Django application. For example, this could be the facility limits of each of our borrowers on a daily basis.

Currently, our models take the following steps:

  1. Build a daily timeseries model. We build this as a utility table in our production schema as we use it in multiple places.

  2. Join on to each day all records from the history table that occured prior to or on that date.

  3. row_number() for each record/day combo to understand which is the most recent record to that day.

  4. Filter on row_number_field = 1.

This typically looks like this:

with timeseries as (

    select * from {{ ref('timeseries') }}

), history as (

    select * from {{ ref('facility_history') }}

), joined as (

    select 
        t.date_day,
        h.*,
        row_number() over (partition by t.date_day, h.id order by h.action_date desc) 
              as row_number_field
    from timeseries t
    inner join history h
        on h.action_date <= t.date_day

)

select *
from joined
where row_number_field = 1

where action_date is the date of the change to the record.

This works fine and we build all of these tables incrementally. However, the initial builds take forever and even some of the incremental loads are less efficient than I’d like because of the “Nested Loop Join”.

My question

Is there a more efficient way to do this without joining on the inequality? We have instances where the join criteria are similar but more complicated where this becomes more of an issue.

One solution that came to mind was leaving as is for the initial load (or exploring @claire’s new split up incremental materialisation) but for each incremental load, which replaces the current day’s value until the day is over, we simply insert the last value from the history table. Has anyone implemented anything like that?


Understanding hash joins in Redshift query plan
#2

@dylanbaker I don’t know all of the context around this, but I think I would just write the query a little differently…

with timeseries as (

    select * from {{ ref('timeseries') }}

), history as (

    select * from {{ ref('facility_history') }}

), history_agg as (

    select 
        h.*,
        h.action_date::date as date_day
        row_number() over (partition by h.id order by h.action_date desc) 
          as row_number_field
    from history

),

history_daily as (

    select * from history_agg where row_number_field = 1

),

joined as (

    select 
      t.date_day,
      h.* --modify to select everything but date_day here
    from timeseries t
    left join history_daily h 
      on t.date_day = h.date_day 
      

)

select * from joined

The core intuition here is that you want to aggregate prior to joining–you previously were joining and aggregating within the same step, which caused the database’s intermediate result to be much bigger than you actually needed. Find the most current row for history, then left join it back to timeseries on a shared key.

If what you actually need is to join a single daily record from history_daily to multiple records in timeseries because you don’t always have a record for every day, then use a lag window function to create a from_date and to_date so that you can do two inequalities instead of one. There is no way to get away from the nested loop join in this condition but you want to limit the total # of rows returned. Currently the fact that you’re only using a single inequality can cause a massive fanout. I think. I frequently need to write date inequality joins just like you’re doing, and never do so without including both a greater than and a less than.


#3

In history_agg above, are we trying to pull the last record of each day for each id? I think we also therefore need to partition by h.action_date::date? I may have misunderstood the change.

Otherwise, that all completely makes sense. We don’t have records for every day in the history table so, as you suggested, I think a lead window function will work well. :+1:


#4

@dylanbaker

I had a similar situation where I was trying to get the most recent value for a particular primary key. I normally use the approach you and @tristan describe but in this particular case, Redshift performed better by doing a self-join with the most recent record. It looks something like this:

with history as (

    select * from {{ ref('facility_history') }}

), history_id_x_max_date as (

    select
        id,
        max(action_date) as action_date
    from history
    group by 1

),

new_history as (

    select *,
           row_number() over (partition by id order by action_date desc)
              as row_number_field
    from history
    join history_id_x_max_date using (id, action_date)

),

history_daily as (

    select * from new_history where row_number_field = 1

)

....

I am not sure why but after adding this self-join the query performed better than using the row_number() function directly against the unfiltered table.

Of course, if your data only has one record per id/date, you would not need the row_number() in this case.

I’m curious to learn if my situation was unique or if this helps in other cases.