The problem I’m having
I have a intermediate table int_table in bq which inserts data from raw table raw_table during every airflow run
The context of why I’m trying to do this
The raw_table is partitioned by _meta_inserted_at field and also during every run i want to insert only the records with the latest _meta_inserted_at into the int_table. I wanted to enforce partition filter because the raw_table has the whole history and want to minimize the records scanned when someone queries the raw_table.
What I’ve already tried
I added the _meta_inserted_at field in the where condition but still it doesn’t work
Some example code or error messages
Cannot query over table 'my_project.my_dataset.raw_table' without a filter over column(s) '_meta_inserted_at' that can be used for partition elimination
raw_table config (data comes from external table - works fine) -
{{
config(
materialized="incremental",
on_schema_change="fail",
persist_docs={"relation": true, "columns": true},
partition_by={"field": "_meta_inserted_at", "data_type": "TIMESTAMP", "granularity": "day"},
cluster_by=["_meta_source_file", "_meta_use_case"],
require_partition_filter=True,
)
}}
int_table script -
{{
config(
materialized="incremental",
on_schema_change="fail",
persist_docs={"relation": true, "columns": true},
partition_by={"field": "_meta_inserted_at", "data_type": "TIMESTAMP", "granularity": "day"},
cluster_by=["_meta_source_file", "_meta_use_case", "transaction_date"],
require_partition_filter=True,
)
}}
{% set airflow_info = fromjson(var('airflow_info', '{}')) %}
select *
from {{ ref("raw_table") }}
{% if is_incremental() %}
where _meta_inserted_at = (
select max(_meta_inserted_at)
from {{ ref("raw_table") }}
where _meta_inserted_at >= safe_cast('{{ airflow_info["ingested_at"] }}' as timestamp)
)
{% else %}
where _meta_inserted_at is not null # added it for full refresh
{% endif %}