Metrics & Partition keys

The problem I’m having

I am building metrics on top of huge fact tables that are stored in Databricks/S3. The fact tables have partition keys and I want to use them to reduce the amount of data scanned when using the metric.

The context of why I’m trying to do this

Source: order_table (billions of rows)
columns: order_id, order_created_timestamp, order_created_date
partition key = order_created_date
Metric:
calculation_method: count_distinct
expression: order_id
Model: evaluate the metric on daily basis and incrementally fill a table that contains daily order counts

What I’ve already tried

Using the where input in metrics.calculate.
Using the order_created_date as a dimension in the metric and then using the where input in metrics.calculate.

Discussion

The problem is that none of the methods push the where clause down to the CTE that actually reads data from my fact table.
Is there some functionality to work with partition keys that I’m missing?
If there is no functionality, does anyone have ideas for production grade workarounds?

I guess one way is to edit the macro and add a suitable parameter, but I’m hoping someone already has a solution. Otherwise metrics are pretty much unusable on big tables.

Hey Silja,

This is a great question! You’re right that using partition keys to filter the source fact table is hard to do today. One of the major benefits of transitioning to MetricFlow for SQL generation is the ability to more easily add performance optimization, like predicate pushdown. Using partitions to filter a large fact table is already supported in MetricFlow today.

There are a few ways to do it. One is to add a constraint to a metric, for example: order_created_date > some date. The drawback here is that this will not be dynamic, and older partitions will not be available. The other way is to include a partition dimension filter at query time, for example by filtering on order_created_date when creating a metrics query in the CLI or in a downstream BI tool. This will push the where constraint down to the fact table and reduce the amount of partitions scanned. However, you would have to set filter defaults in your BI tool, for example, order_create_timestamp is in the last 6 months. There is still a risk that a stakeholder might remove this filter and try to query data for all time, which would result in a full table scan.

All this to say that predicate pushdown is coming! We fully expect metrics to be performant over large data sets. I’m sorry that this doesn’t unblock you right away. While we work on implementing this feature, I’ll do some digging to see if there is a workaround that can unblock you in the way we currently construct metric queries.

To give you some ideas, here is what I was playing around with yesterday.
It kind of works, but the solution also brought up other questions that I will share in a separate post.

It is possible to use variables in metric definitions, so I added filters that filter the partition key column into the metric definition itself. The variables I currently use are the start and end timestamps that I get from the Airflow run. For example, Airflow run starts at 2023-03-16 12:00 and runs hourly means that data_interval_start = 2023-03-16 12:00 and data_interval_end=2023-03-16 13:00.

metrics:
  - name: orders_finished
    label: Orders Finished
    description: "Number of orders finished"
    model: ref('fact_order')
    calculation_method: count_distinct
    expression: order_id
    timestamp: created
    time_grains: [day, week, month, quarter, year]
    filters:
      - field: state
        operator: '='
        value: "'finished'"
      - field: created_date
        operator: '>='
        value: date('{{ var("data_interval_start") }}')
      - field: created_date
        operator: '<='
        value: date('{{ var("data_interval_end") }}')
    dimensions:
      - city_id

I use the metric by materializing its results into an incrementally updated table with city and daily granularity:

{{
    config(
        materialized ="incremental",
        incremental_strategy ="merge",
        unique_key =["city_id", "date_day"],
        partition_by ="date_day",
        schema = "schema_name"
    )
 }}

select * 
from {{ metrics.calculate(
    [
        metric('orders_finished')
    ],
    grain='day',
    dimensions=['city_id']
) }}

The compiled code looks like this, notice that the filter on created_date is in a place we want this to be:



select * 
from 

(

with calendar as (
    select 
        * 
    from `schema_name`.`dbt_metrics_default_calendar`
     
)

, model_b8a8c92f4fbcd57d006a0a6bf049e5e5__aggregate as (
    
    select
        date_day,
        city_id,
        max(metric_date_day is not null) as has_data,
        count(distinct property_to_aggregate__orders_finished) as orders_finished
    from (
        select 
            cast(base_model.created as date) as metric_date_day,
            calendar.date_day as date_day,
            calendar.date_day as window_filter_date,
            base_model.city_id,
            (order_id) as property_to_aggregate__orders_finished
        from `schema_name`.`sfact_order` base_model 
        
        left join calendar
            on cast(base_model.created as date) = calendar.date_day
        
        where 1=1
        and (
            state = 'finished'
            and created_date >= date('2023-03-16 12:00:00')
            and created_date <= date('2023-03-16 14:00:00')
            )
    
        
    ) as base_query

    where 1=1
    group by 1, 2

), model_b8a8c92f4fbcd57d006a0a6bf049e5e5__final as (
    
    select
        parent_metric_cte.date_day,
        parent_metric_cte.city_id,
        coalesce(orders_finished, 0) as orders_finished
    from model_b8a8c92f4fbcd57d006a0a6bf049e5e5__aggregate as parent_metric_cte
)

select
    date_day ,
    city_id,
    orders_finished  
    
from model_b8a8c92f4fbcd57d006a0a6bf049e5e5__final
    
order by 1 desc
    
) metric_subq

For reference, the version where I simply add a filter at metric calculation time. It is very obvious that it’s going to be applied to the timestamp column and not to the date column that I have as a partition key. This results in a full scan of the table.

Metric definition:

  - name: orders_finished2
    label: Orders Finished
    description: "Number of orders finished"
    model: ref('fact_order')
    calculation_method: count_distinct
    expression: order_id
    timestamp: created
    time_grains: [day, week, month, quarter, year]
    filters:
      - field: state
        operator: '='
        value: "'finished'"
    dimensions:
      - city_id
    meta:
      owner: '@silja.mardla'

Calculating the metric:

{{
    config(
        materialized ="incremental",
        incremental_strategy ="merge",
        unique_key =["city_id", "date_day"],
        partition_by ="date_day",
        schema = "schema_name"
    )
 }}

select * 
from {{ metrics.calculate(
    [
        metric('orders_finished2')
    ],
    grain='day',
    dimensions=['city_id'],
    start_date=var("data_interval_start"),
    end_date=var("data_interval_end")
) }}

Compiled code:

select * 
from 

(

with calendar as (
    select 
        * 
    from `schema_name`.`dbt_metrics_default_calendar`
    where date_day >= cast('2023-03-16 12:00:00' as date)
            and date_day <= cast('2023-03-16 14:00:00' as date) 
)

, model_79b7f4638595cdb0e8236ee9dacac2fd__aggregate as (
    
    select
        date_day,
        city_id,
        max(metric_date_day is not null) as has_data,
        count(distinct property_to_aggregate__orders_finished2) as orders_finished2
    from (
        select 
            cast(base_model.created as date) as metric_date_day,
            calendar.date_day as date_day,
            calendar.date_day as window_filter_date,
            base_model.city_id,
            (order_id) as property_to_aggregate__orders_finished2
        from `schema_name`.`fact_order` base_model 
        
        left join calendar
            on cast(base_model.created as date) = calendar.date_day
        
        where 1=1
        and (
        cast(base_model.created as date) >= cast('2023-03-16 12:00:00' as date)
            and cast(base_model.created as date) <= cast('2023-03-16 14:00:00' as date) 
        )
    
        and (
            state = 'finished'
            )
    
        
    ) as base_query

    where 1=1
    group by 1, 2

), model_79b7f4638595cdb0e8236ee9dacac2fd__final as (
    
    select
        parent_metric_cte.date_day,
        parent_metric_cte.city_id,
        coalesce(orders_finished2, 0) as orders_finished2
    from model_79b7f4638595cdb0e8236ee9dacac2fd__aggregate as parent_metric_cte
)

select
    date_day ,
    city_id,
    orders_finished2  
    
from model_79b7f4638595cdb0e8236ee9dacac2fd__final
    
order by 1 desc
    
) metric_subq

And another version that does not work. Here I include created_date as a dimension and add a where clause at metric evaluation time. There are two problems: the where clause gets added to the wrong place and also I can’t use variables because they don’t get evaluated.

Metric definition:

  - name: orders_finished3
    label: Orders Finished
    description: "Number of orders finished"
    model: ref('fact_order')
    calculation_method: count_distinct
    expression: order_id
    timestamp: created
    time_grains: [day, week, month, quarter, year]
    filters:
      - field: state
        operator: '='
        value: "'finished'"
    dimensions:
      - city_id
      - created_date
    meta:
      owner: '@silja.mardla'

Metric calculation:

{{
    config(
        materialized ="view",
        schema = "schema_name"
    )
 }}

select * 
from {{ metrics.calculate(
    [
        metric('orders_finished3')
    ],
    grain='day',
    dimensions=['city_id','created_date'],
    where="created_date >= date('var(data_interval_start)')"
) }}

Compiled code:



select * 
from -- depends on: `dbt_silja_mardla_spark`.`dbt_metrics_default_calendar`

(

with calendar as (
    select 
        * 
    from `schema_name`.`dbt_metrics_default_calendar`
     
)

, model_79b7f4638595cdb0e8236ee9dacac2fd__aggregate as (
    
    select
        date_day,
        city_id,
        created_date,
        max(metric_date_day is not null) as has_data,
        count(distinct property_to_aggregate__orders_finished3) as orders_finished3
    from (
        select 
            cast(base_model.created as date) as metric_date_day,
            calendar.date_day as date_day,
            calendar.date_day as window_filter_date,
            base_model.city_id,
            base_model.created_date,
            (order_id) as property_to_aggregate__orders_finished3
        from `schema_name`.`fact_order` base_model 
        
        left join calendar
            on cast(base_model.created as date) = calendar.date_day
        
        where 1=1
        and (
            state = 'finished'
            )
    
        
    ) as base_query

    where 1=1
    group by 1, 2, 3

), model_79b7f4638595cdb0e8236ee9dacac2fd__final as (
    
    select
        parent_metric_cte.date_day,
        parent_metric_cte.city_id,
        parent_metric_cte.created_date,
        coalesce(orders_finished3, 0) as orders_finished3
    from model_79b7f4638595cdb0e8236ee9dacac2fd__aggregate as parent_metric_cte
)

select
    date_day ,
    city_id,
    created_date,
    orders_finished3  
    
from model_79b7f4638595cdb0e8236ee9dacac2fd__final
where created_date >= date('var(data_interval_start)')
    
order by 1 desc
    
) metric_subq

I am also wondering about this bug:

How come there’s a discussion around the way dates are cast and not about the problem of where this filter gets added? And apparently someone is already working on the bug…

And here is my related question. By putting date filters into the metric definition itself, I am probably messing with the different grain options in an incompatible way :slight_smile:

When used in a model, it doesn’t, I have tested it.