The problem I’m having
i do not know where/how to fit the is_incremental
macro inside my current model
The context of why I’m trying to do this
i have a table where one of the columns is complex nested JSON data structure that i will need to unravel into a tabular format. Something like {"abc": ["123"], "def": [456]}
. The keys are user generated and there are no enforcement on restricting values so I am using Jinja/dbt to help extract these dynamic key/values pairs into a tabular format. For example:
source table:
modified_timestamp | id | json_string |
---|---|---|
2024-01-01 00:01:11 | qwertyfoobar | {abc: [123], def: [456,789]} |
target table:
modified_timestamp | id | key | value |
---|---|---|---|
2024-01-01 00:01:11 | qwertyfoorbar | abc | 123 |
2024-01-01 00:01:11 | qwertyfoobar | def | 456 |
2024-01-01 00:01:11 | qwertyfoobar | def | 789 |
Due to the nature of the json_string
column containing lists, I have run into issues where dbt generates a SQL query that exceeds the maximum size limit. I have read up on the is_incremental
macro but I am stumped on how/where to fit it in my current model.sql
file shown below. I am thinking that placing it inside cte
but I do not know how to call that macro from inside run_query
.
Source and Destination tables are in Redshift
What I’ve already tried
My current approach is running delete
statements in my pre-hook and then grabbing the latest modified records from the source table. The issue is that this approach will fail if there are no new modified records and the query will error out. I see is_incremental
will help address that.
Some example code or error messages
{{ config(
materialized='incremental',
pre_hook={'sql': 'delete from dbt.target_table where last_modified >= (select max(last_modified) from dbt.target_table)'}
)
}}
{# /*grab the latest modified records in the source table*/ #}
{% set cte = run_query("select modified_timestamp, id, json_string
from source_table
where is_valid_json(json_string) and
modified_timestamp >= (select max(modified_timestamp)
from dbt.target_table)
order by modified_timestamp, id") %}
{% if execute %}
{% set results_list = cte.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{# /*unnesting JSON into tabular format*/ #}
with stg as (
{% for row in results_list %}
{% if not loop.first %}
union
{% endif %}
select
{# /* create union statements to make each key/value
* pair its own record in the target table
*/ #}
{% set json_dict = fromjson(row[2]) %}
{% for key, value in json_dict.items() %}
{% set value = value|replace("'", '"') %}
'{{ row[0] }}' as modified_timestamp,
{{ row[1] }} as id,
'{{ key }}' as key,
'{{ value }}' as value
{% if not loop.last %}union select{% endif %}
{% endfor %}
{% endfor %}
)
select cast(a.modified_timestamp as timestamptz) as modified_timestamp
, a.id
, a.key
, json_extract_array_element_text(value, b.num) as value
from stg as a
cross join number.series as b
where b.num < json_array_length(a.value)