Error: dbt incremental model on BQ Partitioned table

The problem I’m having

I have a intermediate table int_table in bq which inserts data from raw table raw_table during every airflow run

The context of why I’m trying to do this

The raw_table is partitioned by _meta_inserted_at field and also during every run i want to insert only the records with the latest _meta_inserted_at into the int_table. I wanted to enforce partition filter because the raw_table has the whole history and want to minimize the records scanned when someone queries the raw_table.

What I’ve already tried

I added the _meta_inserted_at field in the where condition but still it doesn’t work

Some example code or error messages

Cannot query over table 'my_project.my_dataset.raw_table' without a filter over column(s) '_meta_inserted_at' that can be used for partition elimination

raw_table config (data comes from external table - works fine) -

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        partition_by={"field": "_meta_inserted_at", "data_type": "TIMESTAMP", "granularity": "day"},
        cluster_by=["_meta_source_file", "_meta_use_case"],
        require_partition_filter=True,
    )
}}

int_table script -

{{
    config(
        materialized="incremental",
        on_schema_change="fail",
        persist_docs={"relation": true, "columns": true},
        partition_by={"field": "_meta_inserted_at", "data_type": "TIMESTAMP", "granularity": "day"},
        cluster_by=["_meta_source_file", "_meta_use_case", "transaction_date"],
        require_partition_filter=True,

    )
}}

{% set airflow_info = fromjson(var('airflow_info', '{}')) %}

select *
from {{ ref("raw_table") }}
{% if is_incremental() %}
  where _meta_inserted_at = (
      select max(_meta_inserted_at) 
      from {{ ref("raw_table") }} 
      where _meta_inserted_at >= safe_cast('{{ airflow_info["ingested_at"] }}' as timestamp)
  )
{% else %}
  where _meta_inserted_at is not null # added it for full refresh
{% endif %}

The problem appears to be you “incremental case”, from what I know of BigQuery, partition pruning does not work when you are fetching the data in the same query.

Documentation: Query partitioned tables  |  BigQuery  |  Google Cloud

As you can see in that it wants the filter expression to be a constant.

To achieve that in dbt you will need to split your queries.
Use the macro get_single_value from the dbt_utils package to get the value of your nested query and store it a variable as below. (max_meta_inserted_at)

{% set query %}
  select max(_meta_inserted_at) 
      from {{ ref("raw_table") }} 
      where _meta_inserted_at >= safe_cast('{{ airflow_info["ingested_at"] }}' as timestamp)
{% end set %}
{% set max_meta_inserted_at  = dbt_utils.get_single_value(query) %}

Finally use the max_meta_inserted_at in your main query as :-

select *
from {{ ref("raw_table") }}
{% if is_incremental() %}
  where _meta_inserted_at = {{ '"' ~ max_meta_inserted_at ~ '"' }}
{% else %}
  where _meta_inserted_at is not null # added it for full refresh
{% endif %}

The best way to check if its working is to compile the query, you should see a hard coded timestamp value in place of {{ '"' ~ max_meta_inserted_at ~ '"' }} which is updated everytime dbt runs the model.

PS: Haven’t been able to test the dbt code I wrote so there might be some syntax errors, but hopefully it conveys the point.

1 Like

Thank you for your help, Stephen. I modified your code for my use case(below),

{% macro get_max_meta_inserted_at(table_ref, airflow_info) %}
    {% if execute %}
        {% set query %}
            select max(_meta_inserted_at) 
            from {{ table_ref }}
            where _meta_inserted_at >= safe_cast('{{ airflow_info["ingested_at"] }}' as timestamp)
            --where _meta_inserted_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY)
        {% endset %}
        
        {% set max_meta_inserted_at = dbt_utils.get_single_value(query) %}
        {{ log("Max meta inserted at: " ~ max_meta_inserted_at, info=True) }}
        {{ return("'" ~ max_meta_inserted_at ~ "'") }}
    {% endif %}
{% endmacro %}

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.