The problem I’m having
The dbt code runs, however the data held within the partitions, for the last 3 days and today in this example, is missing from the output table.
date | data status |
---|---|
2023-09-17 | missing |
2023-09-16 | missing |
2023-09-15 | missing |
2023-09-14 | missing |
2023-09-13 | present |
Rest of data | present |
Note, I’m using dbt in the normalization feature on the tool Airbyte – docs reference.
The dbt version I’m using is ghcr.io/dbt-labs/dbt-bigquery:1.5.6
.
Some example code or error messages
{% set partitions_to_replace = [
'timestamp(current_date)',
'timestamp(date_sub(current_date, interval 1 day))',
'timestamp(date_sub(current_date, interval 2 day))',
'timestamp(date_sub(current_date, interval 3 day))',
] %}
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {
'field': 'created_at',
'data_type': 'timestamp',
'granularity': 'DAY'},
partitions = partitions_to_replace,
tags=["daily"]
)
}}
WITH __dbt__cte__1 AS (
SELECT
JSON_EXTRACT_SCALAR(_airbyte_data, "$['id']") AS id,
JSON_EXTRACT_SCALAR(_airbyte_data, "$['amount']") AS amount,
JSON_EXTRACT_SCALAR(_airbyte_data, "$['status']") AS status,
JSON_EXTRACT_SCALAR(_airbyte_data, "$['currency']") AS currency,
JSON_EXTRACT_SCALAR(_airbyte_data, "$['created_at']") AS created_at,
JSON_EXTRACT_SCALAR(_airbyte_data, "$['modified_at']") AS modified_at,
_airbyte_ab_id,
_airbyte_emitted_at,
CURRENT_TIMESTAMP() AS _airbyte_normalized_at
FROM project.dataset.table
{% if is_incremental() %}
WHERE TIMESTAMP(JSON_EXTRACT_SCALAR(_airbyte_data, "$['created_at']")) IN ({{ partitions_to_replace | join(',') }})
{% endif %}
)
SELECT
CAST(id AS int64) AS id,
CAST(amount AS int64) AS amount,
CAST(status AS string) AS status,
CAST(currency AS string) AS currency,
CAST(modified_at AS timestamp) AS modified_at,
CAST(created_at AS timestamp) AS created_at,
_airbyte_ab_id,
_airbyte_emitted_at
FROM __dbt__cte__1
Any help will be greatly appreciated.