1.9 snapshots fail in Databricks

The problem I’m having

We have started to utilize dbt core 1.9 and the new way of snapshots described in YAML. For our use case we needed to use an ephemeral model as relation to be able to filter our duplicates in the initial run of the snapshot. Some of our models has more than one unique columns and it is these that fail. The once with only one unique column the snapshot works.

The context

We are running dbt-core 1.9 and our warehouse is in Databricks.

Ephemeral model

{{ config(materialized='ephemeral') }}

WITH station AS (
    SELECT *
    FROM {{ source('kappa_kappa', 'station') }}
    {{ latest_unique_rows(['id'], ['to_timestamp(lastupdated)']) }}
),

exploded_array AS (
    SELECT
        station.createddate,
        station.deletedat,
        forecastarea.id AS forecastarea_id,
        forecastarea.name AS forecastarea_name,
        forecastarea.subscriptionlevel AS forecastarea_subscriptionlevel,
        forecastarea.designcapacity AS forecastarea_designcapacity,
        station.id,
        station.isborderpoint,
        station.lastupdated,
        station.maximolocation,
        station.name,
        station.netbasid,
        station.netbasobjecttype,
        station.substationid,
        station.leap_delta_timestamp,
        station.leap_raw_timestamp,
        station.leap_raw_file,
        NOW() AS leap_updated_at,
        EXPLODE(stationvoltagelevels) AS stationvoltagelevels
    FROM station
),

exploded_sub_array AS (
    SELECT
        exploded_array.createddate,
        exploded_array.deletedat,
        exploded_array.forecastarea_id,
        exploded_array.forecastarea_name,
        exploded_array.forecastarea_subscriptionlevel,
        exploded_array.forecastarea_designcapacity,
        exploded_array.id,
        exploded_array.isborderpoint,
        exploded_array.lastupdated,
        exploded_array.maximolocation,
        exploded_array.name,
        exploded_array.netbasid,
        exploded_array.netbasobjecttype,
        exploded_array.substationid,
        exploded_array.leap_delta_timestamp,
        exploded_array.leap_raw_timestamp,
        exploded_array.leap_raw_file,
        exploded_array.leap_updated_at,
        stationvoltagelevels.id AS stationvoltagelevels_id,
        stationvoltagelevels.stationid AS stationvoltagelevels_stationid,
        stationvoltagelevels.name AS stationvoltagelevels_name,
        stationvoltagelevels.voltage AS stationvoltagelevels_voltage,
        stationvoltagelevels.startvalue AS stationvoltagelevels_startvalue,
        stationvoltagelevels.designcapacity AS stationvoltagelevels_designcapacity,
        stationvoltagelevels.subscriptionlevel AS stationvoltagelevels_subscriptionlevel,
        stationvoltagelevels.createddate AS stationvoltagelevels_createddate,
        stationvoltagelevels.deletedat AS stationvoltagelevels_deletedat,
        EXPLODE(stationvoltagelevels.stationvoltagelevelcapacityupgrades) AS stationvoltagelevelcapacityupgrades
    FROM exploded_array
)

SELECT
    exploded_sub_array.createddate,
    exploded_sub_array.deletedat,
    exploded_sub_array.forecastarea_id,
    exploded_sub_array.forecastarea_name,
    exploded_sub_array.forecastarea_subscriptionlevel,
    exploded_sub_array.forecastarea_designcapacity,
    exploded_sub_array.id,
    exploded_sub_array.isborderpoint,
    exploded_sub_array.lastupdated,
    exploded_sub_array.maximolocation,
    exploded_sub_array.name,
    exploded_sub_array.netbasid,
    exploded_sub_array.netbasobjecttype,
    exploded_sub_array.substationid,
    exploded_sub_array.stationvoltagelevels_id,
    exploded_sub_array.stationvoltagelevels_stationid,
    exploded_sub_array.stationvoltagelevels_name,
    exploded_sub_array.stationvoltagelevels_voltage,
    exploded_sub_array.stationvoltagelevels_startvalue,
    exploded_sub_array.stationvoltagelevels_designcapacity,
    exploded_sub_array.stationvoltagelevels_subscriptionlevel,
    exploded_sub_array.stationvoltagelevels_createddate,
    exploded_sub_array.stationvoltagelevels_deletedat,
    stationvoltagelevelcapacityupgrades.id AS stationvoltagelevelcapacityupgrades_id,
    stationvoltagelevelcapacityupgrades.stationvoltagelevelid AS stationvoltagelevelcapacityupgrades_stationvoltagelevelid,
    stationvoltagelevelcapacityupgrades.designcapacity AS stationvoltagelevelcapacityupgrades_designcapacity,
    stationvoltagelevelcapacityupgrades.year AS stationvoltagelevelcapacityupgrades_year,
    stationvoltagelevelcapacityupgrades.pcsid AS stationvoltagelevelcapacityupgrades_pcsid,
    stationvoltagelevelcapacityupgrades.createddate AS stationvoltagelevelcapacityupgrades_createddate,
    stationvoltagelevelcapacityupgrades.deletedat AS stationvoltagelevelcapacityupgrades_deletedat,
    exploded_sub_array.leap_delta_timestamp,
    exploded_sub_array.leap_raw_timestamp,
    exploded_sub_array.leap_raw_file,
    exploded_sub_array.leap_updated_at
FROM exploded_sub_array

Snapshot

snapshots:
  - name: bronze_snapshot_station
    relation: ref('bronze_ephemeral_station')
    config:
      schema: kappa_kappa
      unique_key: [id, stationvoltagelevels_id, stationvoltagelevelcapacityupgrades_year]
      strategy: timestamp
      updated_at: lastupdated

The temp view generated by dbt

/* {"app": "dbt", "dbt_version": "1.9.0", "dbt_databricks_version": "1.9.0", "databricks_sql_connector_version": "3.6.0", "profile_name": "leap", "target_name": "operations", "node_id": "snapshot.leap.bronze_snapshot_station"} */
create
or replace view `dev_bronze`.`kappa_kappa`.`snapshot_station__dbt_tmp` as with snapshot_query as (
  with __dbt__cte__ephemeral_station as (
    WITH station AS (
      SELECT
        *
      FROM
        `dev_delta`.`kappa_kappa`.`station` QUALIFY ROW_NUMBER() OVER (
          PARTITION BY id
          ORDER BY
            to_timestamp(lastupdated) DESC
        ) = 1
    ),
    exploded_array AS (
      SELECT
        station.createddate,
        station.deletedat,
        forecastarea.id AS forecastarea_id,
        forecastarea.name AS forecastarea_name,
        forecastarea.subscriptionlevel AS forecastarea_subscriptionlevel,
        forecastarea.designcapacity AS forecastarea_designcapacity,
        station.id,
        station.isborderpoint,
        station.lastupdated,
        station.maximolocation,
        station.name,
        station.netbasid,
        station.netbasobjecttype,
        station.substationid,
        station.leap_delta_timestamp,
        station.leap_raw_timestamp,
        station.leap_raw_file,
        NOW() AS leap_updated_at,
        EXPLODE(stationvoltagelevels) AS stationvoltagelevels
      FROM
        station
    ),
    exploded_sub_array AS (
      SELECT
        exploded_array.createddate,
        exploded_array.deletedat,
        exploded_array.forecastarea_id,
        exploded_array.forecastarea_name,
        exploded_array.forecastarea_subscriptionlevel,
        exploded_array.forecastarea_designcapacity,
        exploded_array.id,
        exploded_array.isborderpoint,
        exploded_array.lastupdated,
        exploded_array.maximolocation,
        exploded_array.name,
        exploded_array.netbasid,
        exploded_array.netbasobjecttype,
        exploded_array.substationid,
        exploded_array.leap_delta_timestamp,
        exploded_array.leap_raw_timestamp,
        exploded_array.leap_raw_file,
        exploded_array.leap_updated_at,
        stationvoltagelevels.id AS stationvoltagelevels_id,
        stationvoltagelevels.stationid AS stationvoltagelevels_stationid,
        stationvoltagelevels.name AS stationvoltagelevels_name,
        stationvoltagelevels.voltage AS stationvoltagelevels_voltage,
        stationvoltagelevels.startvalue AS stationvoltagelevels_startvalue,
        stationvoltagelevels.designcapacity AS stationvoltagelevels_designcapacity,
        stationvoltagelevels.subscriptionlevel AS stationvoltagelevels_subscriptionlevel,
        stationvoltagelevels.createddate AS stationvoltagelevels_createddate,
        stationvoltagelevels.deletedat AS stationvoltagelevels_deletedat,
        EXPLODE(
          stationvoltagelevels.stationvoltagelevelcapacityupgrades
        ) AS stationvoltagelevelcapacityupgrades
      FROM
        exploded_array
    )
    SELECT
      exploded_sub_array.createddate,
      exploded_sub_array.deletedat,
      exploded_sub_array.forecastarea_id,
      exploded_sub_array.forecastarea_name,
      exploded_sub_array.forecastarea_subscriptionlevel,
      exploded_sub_array.forecastarea_designcapacity,
      exploded_sub_array.id,
      exploded_sub_array.isborderpoint,
      exploded_sub_array.lastupdated,
      exploded_sub_array.maximolocation,
      exploded_sub_array.name,
      exploded_sub_array.netbasid,
      exploded_sub_array.netbasobjecttype,
      exploded_sub_array.substationid,
      exploded_sub_array.stationvoltagelevels_id,
      exploded_sub_array.stationvoltagelevels_stationid,
      exploded_sub_array.stationvoltagelevels_name,
      exploded_sub_array.stationvoltagelevels_voltage,
      exploded_sub_array.stationvoltagelevels_startvalue,
      exploded_sub_array.stationvoltagelevels_designcapacity,
      exploded_sub_array.stationvoltagelevels_subscriptionlevel,
      exploded_sub_array.stationvoltagelevels_createddate,
      exploded_sub_array.stationvoltagelevels_deletedat,
      stationvoltagelevelcapacityupgrades.id AS stationvoltagelevelcapacityupgrades_id,
      stationvoltagelevelcapacityupgrades.stationvoltagelevelid AS stationvoltagelevelcapacityupgrades_stationvoltagelevelid,
      stationvoltagelevelcapacityupgrades.designcapacity AS stationvoltagelevelcapacityupgrades_designcapacity,
      stationvoltagelevelcapacityupgrades.year AS stationvoltagelevelcapacityupgrades_year,
      stationvoltagelevelcapacityupgrades.pcsid AS stationvoltagelevelcapacityupgrades_pcsid,
      stationvoltagelevelcapacityupgrades.createddate AS stationvoltagelevelcapacityupgrades_createddate,
      stationvoltagelevelcapacityupgrades.deletedat AS stationvoltagelevelcapacityupgrades_deletedat,
      exploded_sub_array.leap_delta_timestamp,
      exploded_sub_array.leap_raw_timestamp,
      exploded_sub_array.leap_raw_file,
      exploded_sub_array.leap_updated_at
    FROM
      exploded_sub_array
  )
  select
    *
  from
    __dbt__cte__ephemeral_station
),
snapshotted_data as (
  select
    *,
    id as dbt_unique_key_1,
    stationvoltagelevels_id as dbt_unique_key_2,
    stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3
  from
    `dev_bronze`.`kappa_kappa`.`snapshot_station`
  where
    dbt_valid_to is null
),
insertions_source_data as (
  select
    *,
    id as dbt_unique_key_1,
    stationvoltagelevels_id as dbt_unique_key_2,
    stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3,
    lastupdated as dbt_updated_at,
    lastupdated as dbt_valid_from,
    coalesce(nullif(lastupdated, lastupdated), null) as dbt_valid_to,
    md5(
      coalesce(cast(id as string), '') || '|' || coalesce(cast(stationvoltagelevels_id as string), '') || '|' || coalesce(
        cast(
          stationvoltagelevelcapacityupgrades_year as string
        ),
        ''
      ) || '|' || coalesce(cast(lastupdated as string), '')
    ) as dbt_scd_id
  from
    snapshot_query
),
updates_source_data as (
  select
    *,
    id as dbt_unique_key_1,
    stationvoltagelevels_id as dbt_unique_key_2,
    stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3,
    lastupdated as dbt_updated_at,
    lastupdated as dbt_valid_from,
    lastupdated as dbt_valid_to
  from
    snapshot_query
),
insertions as (
  select
    'insert' as dbt_change_type,
    source_data.*
  from
    insertions_source_data as source_data
    left outer join snapshotted_data on snapshotted_data.dbt_unique_key_1 = source_data.dbt_unique_key_1 -- This is where Databricks highlights the error
    and snapshotted_data.dbt_unique_key_2 = source_data.dbt_unique_key_2
    and snapshotted_data.dbt_unique_key_3 = source_data.dbt_unique_key_3
  where
    snapshotted_data.dbt_unique_key_1 is null
    or (
      snapshotted_data.dbt_unique_key_1 is not null
      and (
        (
          snapshotted_data.dbt_valid_from < source_data.lastupdated
        )
      )
    )
),
updates as (
  select
    'update' as dbt_change_type,
    source_data.*,
    snapshotted_data.dbt_scd_id
  from
    updates_source_data as source_data
    join snapshotted_data on snapshotted_data.dbt_unique_key_1 = source_data.dbt_unique_key_1
    and snapshotted_data.dbt_unique_key_2 = source_data.dbt_unique_key_2
    and snapshotted_data.dbt_unique_key_3 = source_data.dbt_unique_key_3
  where
    (
      (
        snapshotted_data.dbt_valid_from < source_data.lastupdated
      )
    )
)
select
  *
from
  insertions
union all
select
  *
from
  updates

The error message:

In the code above I have commented the line where Databricks finds the error.
[AMBIGUOUS_REFERENCE] Reference snapshotted_data.dbt_unique_key_1 is ambiguous, could be: [snapshotted_data.dbt_unique_key_1, snapshotted_data.dbt_unique_key_1]. SQLSTATE: 42704; line 166, pos 40

Workaround

In the meantime we will have to concatenate the unique columns in the ephemeral model and use that column as the sole unique key.

This issue has been resolved by the maintainer of dbt-databricks package in the following PR: Adding missing 1.9 Snapshot behavior by benc-db · Pull Request #904 · databricks/dbt-databricks · GitHub.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.