Ephemeral models, snapshots and new columns

New columns in the source table cause error during snapshotting

Expected behaviour: schema change where a new column is added is recognized and the new column is appended to the snapshot

I’m currently having the problem that whenever a new column appears in a snapshot, the dbt snapshot job fails for the model in question. The current structure is as follows: source → epehemeral model that uses a macro → snapshot.

Definition of snapshot:

- name: history_closed_product
    relation: ref('ephemeral_monolith_closed_product')
    description: abc
    config:
      database: "{{env_var('DBT_ENV_NAME') ~ '_domain_dwh'}}"
      schema: history
      strategy: check
      unique_key: cpr_id
      check_cols: all
      hard_deletes: new_record

Definition of the ephemeral model:

{{
  config(
    materialized='ephemeral'
  )
}}

{% set all_columns = adapter.get_columns_in_relation(source('monolith', 'closed_product')) %}
{% set except_col_names=[] %}

with staging as (
    select *
    from {{ source('monolith','closed_product')}}
)
select {{ cast_columns(all_columns, except_col_names)}}
from staging 

The macro in question used only serves to cast data types. Here’s the definition:

{% macro cast_columns(all_cols, except_col_names = []) -%}
        
    {% for col in all_cols if col.name | lower not in except_col_names -%} 
        {%- if col.data_type == 'decimal(38,10)' or (col.data_type.startswith('decimal(') and col.data_type.endswith(',0)')) -%}         
            cast({{ col.name|lower }} as INT) as {{ col.name|lower }} /*{{ col.data_type }}*/ {%- if not loop.last %},{{ '\n  ' }}{% endif %}
        {%- elif col.data_type.startswith('varchar') -%}         
            cast({{ col.name|lower }} as STRING) as {{ col.name|lower }} /*{{ col.data_type }}*/ {%- if not loop.last %},{{ '\n  ' }}{% endif %}
        {%- elif col.data_type.startswith('decimal') -%}
            cast({{ col.name|lower }} as DOUBLE) as {{ col.name|lower }} /*{{ col.data_type }}*/ {%- if not loop.last %},{{ '\n  ' }}{% endif %}
        {%- else -%}
            {{ col.name|lower }} as {{ col.name|lower }} /*{{ col.data_type }}*/ {%- if not loop.last %},{{ '\n  ' }}{% endif %}
        {%- endif -%}
    {% endfor %}

{%- endmacro %}

Compiling the code for the ephemeral model shows that is recognizes the new column (in this case cpr_subject) in the source table and casts the data types:

select ...
  cast(cpr_subject as STRING) as cpr_subject /*varchar(3)*/,
  cast(cpr_subject_id as INT) as cpr_subject_id /*decimal(22,0)*/
from staging

The error logs for the snapshot show that the problem is the new column, as it does not recognize it:

[UNRESOLVED_COLUMN.WITH_SUGGESTION] 
A column, variable, or function parameter with name `snapshotted_data`.`cpr_subject` cannot be resolved. 
Did you mean one of the following? [`snapshotted_data`.`cpr_seq`, `snapshotted_data`.`cpr_desc`, `snapshotted_data`.`cpr_product`, `snapshotted_data`.`cpr_app_id`, `snapshotted_data`.`cpr_bpr_id`]. SQLSTATE: 42703; line 262 pos 12

Here’s some of the code from the log files where the problem occurs:

create or replace temporary view `history_closed_product__dbt_tmp` as
with snapshot_query as 
    (
    with __dbt__cte__ephemeral_monolith_closed_product as 
        (
        with staging as (
                            select *
                            from `landingzone_dev`.`monolith`.`closed_product`
                        )
            select cast(cpr_id as INT) as cpr_id /*decimal(22,0)*/,
            cast(cpr_app_id as INT) as cpr_app_id /*decimal(22,0)*/,
            cast(cpr_seq as INT) as cpr_seq /*decimal(38,10)*/,
            cast(cpr_person as STRING) as cpr_person /*varchar(10)*/,
            cast(cpr_provider as STRING) as cpr_provider /*varchar(10)*/,
            cast(cpr_product as STRING) as cpr_product /*varchar(10)*/,
            cast(cpr_tariff as STRING) as cpr_tariff /*varchar(100)*/,
            cast(cpr_police as STRING) as cpr_police /*varchar(100)*/,
            cast(cpr_gross_burden as STRING) as cpr_gross_burden /*varchar(20)*/,
            cast(cpr_net_burden as STRING) as cpr_net_burden /*varchar(20)*/,
            cast(cpr_insured_amount as DOUBLE) as cpr_insured_amount /*decimal(12,2)*/,
            cast(cpr_sales_commission as DOUBLE) as cpr_sales_commission /*decimal(12,3)*/,
            cast(cpr_broker_commission as DOUBLE) as cpr_broker_commission /*decimal(12,3)*/,
            cast(cpr_commission_reduction as DOUBLE) as cpr_commission_reduction /*decimal(12,3)*/,
            cpr_start_date as cpr_start_date /*timestamp*/,
            cast(cpr_duration as INT) as cpr_duration /*decimal(3,0)*/,
            cast(cpr_deposit as INT) as cpr_deposit /*decimal(1,0)*/,
            cast(cpr_payment_duration as INT) as cpr_payment_duration /*decimal(3,0)*/,
            cast(cpr_desc as STRING) as cpr_desc /*varchar(400)*/,
            cast(cpr_approved as INT) as cpr_approved /*decimal(1,0)*/,
            cast(cpr_approved_by as STRING) as cpr_approved_by /*varchar(18)*/,
            cast(cpr_all_additional_info as STRING) as cpr_all_additional_info /*varchar(2000)*/,
            cast(cpr_own_commission as DOUBLE) as cpr_own_commission /*decimal(12,3)*/,
            cast(cpr_own_commission_to as STRING) as cpr_own_commission_to /*varchar(18)*/,
            cast(cpr_applicant_lname as STRING) as cpr_applicant_lname /*varchar(50)*/,
            cast(cpr_applicant_fname as STRING) as cpr_applicant_fname /*varchar(50)*/,
            cast(cpr_commission_base as DOUBLE) as cpr_commission_base /*decimal(12,2)*/,
            cast(cpr_prv_id as INT) as cpr_prv_id /*decimal(22,0)*/,
            cast(cpr_auto_approve as INT) as cpr_auto_approve /*decimal(1,0)*/,
            cast(cpr_platform_fee as DOUBLE) as cpr_platform_fee /*decimal(12,3)*/,
            cast(cpr_bpr_id as INT) as cpr_bpr_id /*decimal(38,10)*/,
            cast(cpr_company_commission as DOUBLE) as cpr_company_commission /*decimal(12,3)*/,
            cast(cpr_processor_commission as DOUBLE) as cpr_processor_commission /*decimal(12,3)*/,
            cast(cpr_contract_commission as DOUBLE) as cpr_contract_commission /*decimal(12,3)*/,
            cast(cpr_external_product_id as STRING) as cpr_external_product_id /*varchar(255)*/,
            cast(cpr_external_commission as DOUBLE) as cpr_external_commission /*decimal(12,3)*/,
            cast(cpr_subject as STRING) as cpr_subject /*varchar(3)*/,
            cast(cpr_subject_id as INT) as cpr_subject_id /*decimal(22,0)*/
            from staging
        ) 
        select * from __dbt__cte__ephemeral_monolith_closed_product
    ),

snapshotted_data as 
       (
       select *, 
       cpr_id as dbt_unique_key
       from `dev_domain_dwh`.`history`.`history_closed_product`
       where
           dbt_valid_to is null
       ),

insertions_source_data as 
    (
        select *, 
        cpr_id as dbt_unique_key,
        current_timestamp() as dbt_updated_at,
        current_timestamp() as dbt_valid_from,
        coalesce(nullif(current_timestamp(), current_timestamp()), null) as dbt_valid_to,
        md5(coalesce(cast(cpr_id as string ), '')|| '|' || coalesce(cast(current_timestamp() as string ), '')) as dbt_scd_id
        from snapshot_query
    ),

updates_source_data as 
    (
    select *, 
        cpr_id as dbt_unique_key,
        current_timestamp() as dbt_updated_at, 
        current_timestamp() as dbt_valid_from,
        current_timestamp() as dbt_valid_to
    from snapshot_query
    ),
deletes_source_data as 
    (    
    select *, 
        cpr_id as dbt_unique_key
    from snapshot_query
    ),
insertions as 
    (
    select
       'insert' as dbt_change_type,
        source_data.*,'False' as dbt_is_deleted
    from insertions_source_data as source_data
        left outer join snapshotted_data
            on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
    where 
        snapshotted_data.dbt_unique_key is null or (snapshotted_data.dbt_unique_key is not null and ((TRUE)))
    ),

 updates as (
       select
            'update' as dbt_change_type,
            source_data.*,
            snapshotted_data.dbt_scd_id,
             snapshotted_data.dbt_is_deleted
        from updates_source_data as source_data
            join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where ((TRUE))
     ),
deletes as (
        select
            'delete' as dbt_change_type,
            source_data.*,            
            current_timestamp() as dbt_valid_from,
            current_timestamp() as dbt_updated_at,
            current_timestamp() as dbt_valid_to,
            snapshotted_data.dbt_scd_id
            , snapshotted_data.dbt_is_deleted
        from snapshotted_data
        left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where 
        source_data.dbt_unique_key is null
    )
    ,
    deletion_records as (
        select
            'insert' as dbt_change_type,snapshotted_data.`cpr_id`,
            snapshotted_data.`cpr_app_id`,
            snapshotted_data.`cpr_seq`,
            snapshotted_data.`cpr_person`,
            snapshotted_data.`cpr_provider`,
            snapshotted_data.`cpr_product`,
            snapshotted_data.`cpr_tariff`,
            snapshotted_data.`cpr_police`,
            snapshotted_data.`cpr_gross_burden`,
            snapshotted_data.`cpr_net_burden`,
            snapshotted_data.`cpr_insured_amount`,
            snapshotted_data.`cpr_sales_commission`,
            snapshotted_data.`cpr_broker_commission`,
            snapshotted_data.`cpr_commission_reduction`,
            snapshotted_data.`cpr_start_date`,
            snapshotted_data.`cpr_duration`,
            snapshotted_data.`cpr_deposit`,
            snapshotted_data.`cpr_payment_duration`,
            snapshotted_data.`cpr_desc`,
            snapshotted_data.`cpr_approved`,
            snapshotted_data.`cpr_approved_by`,
            snapshotted_data.`cpr_all_additional_info`,
            snapshotted_data.`cpr_own_commission`,
            snapshotted_data.`cpr_own_commission_to`,
            snapshotted_data.`cpr_applicant_lname`,
            snapshotted_data.`cpr_applicant_fname`,
            snapshotted_data.`cpr_commission_base`,
            snapshotted_data.`cpr_prv_id`,
            snapshotted_data.`cpr_auto_approve`,
            snapshotted_data.`cpr_platform_fee`,
            snapshotted_data.`cpr_bpr_id`,
            snapshotted_data.`cpr_company_commission`,
            snapshotted_data.`cpr_processor_commission`,
            snapshotted_data.`cpr_contract_commission`,
            snapshotted_data.`cpr_external_product_id`,
            snapshotted_data.`cpr_external_commission`,
            snapshotted_data.`cpr_subject`,
            snapshotted_data.`cpr_subject_id`,
            snapshotted_data.dbt_unique_key as dbt_unique_key,
            current_timestamp() as dbt_valid_from,
            current_timestamp() as dbt_updated_at,
            snapshotted_data.dbt_valid_to as dbt_valid_to,
            md5(coalesce(cast(dbt_scd_id as string ), '')|| '|' || coalesce(cast(current_timestamp() as string ), '')) as dbt_scd_id,
            'True' as dbt_is_deleted
        from snapshotted_data
        left join deletes_source_data as source_data
            on 
        snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
        where 
        source_data.dbt_unique_key is null
    )

    select * from insertions
    union all
    select * from updates
    union all
    select * from deletes
    union all
    select * from deletion_records

The problem seems to be occuring at the snapshotted_data CTE, which defines the new column to be selected but the underlying snapshot table does not have this column:

snapshotted_data as 
       (
       select *, 
       cpr_id as dbt_unique_key
       from `dev_domain_dwh`.`history`.`history_closed_product`
       where
           dbt_valid_to is null
       )

Does anyone have an idea about what is wrong? I would rather not have to manually add the column to the table every time.