Optimizing the snowplow package's `snowplow_page_views` model for resource usage on BigQuery

We use dbt’s snowplow package w/ BigQuery to process data across a group of websites that see about 3-4M page views on a daily basis. Even with a lookback period of just 1 day for incremental models, the snowplow_page_views models was failing because it exceeded BigQuery’s resource limits. More specifically, the error message said something like this:

Peak usage: 128% of limit. Top memory consumer(s): sort operations used for analytic OVER() clauses: 98% other/unattributed: 2%

(Interesting blog post on in-memory query execution w/ BQ)

The page views model had a few different row_number() over () clauses and I was able to identify that a step responsible for deduplicating events on the event_id field was the one causing trouble. Here’s what that looks like:

    relevant_events as (

        select *,
            row_number() over (partition by event_id order by dvce_created_tstamp) as dedupe

        from all_events
        where domain_sessionid in (select distinct domain_sessionid from new_sessions)

    )

A subsequent CTE then selects all rows where relevant_events.dedupe = 1

I’ve seen such “resources exceeded” errors a number of times, almost always with sort operations in window functions and to the best of my understanding this has something to do with BigQuery doing the sort operation in a single node (Maybe someone else can provide more details on how this works). Page 50 of this deck on query optimization with BigQuery suggests that rewriting such cases with an array_agg() instead is more resource efficient since any elements not selected by your offset (see example) can be dropped when sorting.

Here’s what the rewritten deduplication step looks like:

relevant_events as (

    select
        event.*

    from (

        select
            e.event_id, 
            array_agg(e order by dvce_created_tstamp limit 1)[offset(0)] as event
        from all_events as e
        group by 1

    )

    where event.domain_sessionid in (select distinct domain_sessionid from new_sessions)

)

Instead of numbering events with the same ID by timestamp and then filtering for the first in each group, you can aggregate them, ordered by timestamp, and then select the first from each array to get the same effect. For the record, the rewritten version did run successfully within the allocated resources.

I’m going to write up a PR to the snowplow package later this week.

P.S. The part I’m still trying to wrap my head around is I wouldn’t expect each partition to have too many rows considering I’m partitioning by an ID field. I’m not sure why this sort operation would still be resource heavy.