I would like to make an incremental DBT model of a daily changelog from a changelog that looks like the following:
ID TIMESTAMP DATA OPERATION
id1 2023-01-01 13:40 data1_v1 create
id1 2023-01-01 15:00 data1_v2 update
id1 2023-01-03 00:02 data1_v3 update
id1 2023-01-04 05:04 data1_v3 delete
id2 2023-01-01 XX:XX data2_v1 create
id2 2023-01-02 XX:XX data2_v2 update
id2 2023-01-04 XX:XX data2_v3 update
I would like to accomplish a daily picture of the data of each id (filtering deletes each day), so we get a daily changelog:
ID DATE DATA
id1 2023-01-01 data1_v2
id1 2023-01-02 data1_v2
id1 2023-01-03 data1_v3
id2 2023-01-01 data2_v1
id2 2023-01-02 data2_v2
id2 2023-01-03 data2_v2
id2 2023-01-04 data2_v3
Mentioning:
- For id1 in date 2023-01-01, data1_v2 is the data we want because it is the latest that day
- id1 does not figure in 2023-01-04 because it was deleted
- id2 in date 2023-01-03 is data2_v2 since the it didn't change that day
We use BigQuery as our warehouse connected to DBT.
My current approach is the following:
with
days as (
select day
from unnest(generate_date_array(date_sub(current_date(), interval 6 month), current_date())) day
),
changelog as (
select date, timestamp, document_id, data
from {{ ref("base_firestore_export__event_raw_changelog") }}
),
daily_changelog as (
select
date,
document_id,
last_value(data ignore nulls) over (order by timestamp) as data
from days d
left join changelog c on d.day = c.date
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where date > (select max(date) from {{ this }})
{% endif %}
)
select *
from daily_changelog
but I get a memory error:
Database Error in rpc request (from remote system) Resources exceeded during query execution: The query could not be executed in the allotted memory. Peak usage: 122% of limit. Top memory consumer(s): sort operations used for analytic OVER() clauses: 98% other/unattributed: 2%
Any help would be much appreciated! Thanks!