Breaking a large query into parallelized partitioned queries

We have a long-running model that performs a complex set of geospatial comparisons to get point-to-point distances between every person in the US and a list of around 300K points of interest. We are using snowflake, and using snowflake’s geospatial data types and functions. We also do restrict the comparisons, so we are not (for instance) calculating distances between people in New York with POI in Los Angeles. Regardless, the first pass at the model took over 24 hours to execute on a large snowflake warehouse. This first pass was not implemented in dbt. We then tried essentially partitioning the query into multiple queries, each running a separate region of the country, executing in parallel. This reduced the overall execution time by around 50%, with approximately the same credit burn on snowflake. This also was not in dbt. The orchestration was simply a python script that fired off the regional queries in parallel.

We want to move this model into dbt, but are a little stuck on how to accomplish the parallelism bit. Based on my understanding of dbt, since this is technically a single model, resulting in a single table, it would not “natively” be able to be parallelized the way described. One thought we have is to create separate models for different geographic regions, which can execute in parallel (via dbt threads), with downstream union to get at the final data set. But, if we were to decide that we wanted change our geographic “partitioning”, that then would mean creating or removing intermediate models. Depending on how small we determined our geographic regions needed to be, it also could require quite a lot of intermediate models. Are there other options we are not thinking of?

4 Likes

Not simple, but you could also implement the partitioning logic as a custom materialization. See Creating new materializations | dbt Docs

The custom materialization would encapsulate the logic for performing the partitioning, and then unioning, ultimately outputing just 1 table, with downstream dependencies being none-the-wiser about how this came to be.

Are you re-calculating the entire dataset on every invocation? How much of it changes between two runs and how often do you run it?

Where I’m going with this is that you might consider an incremental model. By doing this, you would pay the large calculation cost once, but would then only have to calculate the changes from there.

If you have a table showing users’ locations and when they last changed, it might look something like

{% config(materialized='incremental', unique_key='user_location_key') %}

with user_locations as (
    select * from {{ ref('user_locations') }}
),
points_of_interest as (
    select * from {{ ref('points_of_interest') }}
),

final as (
    select 
        {{ dbt_utils.surrogate_key(['user_locations.user_id', 'points_of_interest.poi_id']) }} as user_location_key,
        user_locations.user_id,
        points_of_interest.poi_id,
        calculate_distance(user_locations.location, points_of_interest.location) as distance,
        user_locations.modified_date as user_location_changed_at
    from user_locations
    cross join points_of_interest using (user_id, state_id)
    {% if is_incremental() %}
        where user_locations.user_location_changed_at >= select(max(user_location_changed_at) from {{ this }})
    {% endif %}
)

select * 
from final

I should check back for replies to my questions more often :slight_smile: . In our current use case, we do have to fully rebuild the data set on each execution, as this model gets re-run when we get fully new underlying data sets. We might be able to implement an incremental approach, but I think the returns on investment there would be limited because the two input base data sets can change drastically between runs.

As for the idea of a custom materialization, I am very interested in looking in to that. Actually started looking in to it today, taking the existing implementation for the table materialization and starting to fiddle with it. So far, approach is to derive the partition values from the source (or hard code them in, for dev purposes), use the model sql as a basis to define a temporary relation for each partition (suffixed with the partition value), populate it (applying the partitioning on the data while populating its specific temp relation), and then union the list of temp relations returning the target final relation. However, what I cannot figure out is how to get the temp partitions to execute in parallel to get the scalability I’m really looking for. Anyway, only a couple of hours put in to it, will post back if I end up making it work.

Incidentally, I’m very excited to see if the python models in beta right now will enable this type of behavior more naturally.

1 Like

That approach sounds reasonable! I’m pretty sure that you’ll find that although dbt is multithreaded and can do parallel processing, its granularity is at the model level. So the temp partitions likely won’t execute in parallel unless you can find a way to fire off multiple builds in some sort of other task in Snowflake (and then you’d need some way to poll to know whether they’re all finished or not).

I don’t know enough about Snowflake to know if that sort of thing is possible; interested to hear how you get on!

Also an interesting idea! Going to send this to our Python PM as a fun nerd snipe if nothing else

LOVE this thinking on Python models. If you experiment more on this will you let us know how it goes? Very interested…

Really interesting thought about whether a Python model could help here…

As I understand it, parallel processing is the reason folks go through the trouble of writing PySpark code, rather than just single-node Pandas.

Snowpark also supports this explicitly via its “batch API,” by adding the @vectorized decorator to a UDF. It will apply the function over batches of rows/series (in parallel): Python UDF Batch API — Snowflake Documentation. Though I’m not sure if you can tell it how to batch the data (by region), to restrict the comparisons — beyond just specifying the batch size. If the source table is clustered on country, though…?

I’d hope that the execution engine is smart enough to handle that parallelism itself. I would feel less good about users needing to muck with, like, asyncio within their Python model definition (if such a thing were even possible). I shouldn’t even say such words for fear of wishing them into existence