The problem I’m having
We have started to utilize dbt core 1.9 and the new way of snapshots described in YAML. For our use case we needed to use an ephemeral model as relation to be able to filter our duplicates in the initial run of the snapshot. Some of our models has more than one unique columns and it is these that fail. The once with only one unique column the snapshot works.
The context
We are running dbt-core 1.9 and our warehouse is in Databricks.
Ephemeral model
{{ config(materialized='ephemeral') }}
WITH station AS (
SELECT *
FROM {{ source('kappa_kappa', 'station') }}
{{ latest_unique_rows(['id'], ['to_timestamp(lastupdated)']) }}
),
exploded_array AS (
SELECT
station.createddate,
station.deletedat,
forecastarea.id AS forecastarea_id,
forecastarea.name AS forecastarea_name,
forecastarea.subscriptionlevel AS forecastarea_subscriptionlevel,
forecastarea.designcapacity AS forecastarea_designcapacity,
station.id,
station.isborderpoint,
station.lastupdated,
station.maximolocation,
station.name,
station.netbasid,
station.netbasobjecttype,
station.substationid,
station.leap_delta_timestamp,
station.leap_raw_timestamp,
station.leap_raw_file,
NOW() AS leap_updated_at,
EXPLODE(stationvoltagelevels) AS stationvoltagelevels
FROM station
),
exploded_sub_array AS (
SELECT
exploded_array.createddate,
exploded_array.deletedat,
exploded_array.forecastarea_id,
exploded_array.forecastarea_name,
exploded_array.forecastarea_subscriptionlevel,
exploded_array.forecastarea_designcapacity,
exploded_array.id,
exploded_array.isborderpoint,
exploded_array.lastupdated,
exploded_array.maximolocation,
exploded_array.name,
exploded_array.netbasid,
exploded_array.netbasobjecttype,
exploded_array.substationid,
exploded_array.leap_delta_timestamp,
exploded_array.leap_raw_timestamp,
exploded_array.leap_raw_file,
exploded_array.leap_updated_at,
stationvoltagelevels.id AS stationvoltagelevels_id,
stationvoltagelevels.stationid AS stationvoltagelevels_stationid,
stationvoltagelevels.name AS stationvoltagelevels_name,
stationvoltagelevels.voltage AS stationvoltagelevels_voltage,
stationvoltagelevels.startvalue AS stationvoltagelevels_startvalue,
stationvoltagelevels.designcapacity AS stationvoltagelevels_designcapacity,
stationvoltagelevels.subscriptionlevel AS stationvoltagelevels_subscriptionlevel,
stationvoltagelevels.createddate AS stationvoltagelevels_createddate,
stationvoltagelevels.deletedat AS stationvoltagelevels_deletedat,
EXPLODE(stationvoltagelevels.stationvoltagelevelcapacityupgrades) AS stationvoltagelevelcapacityupgrades
FROM exploded_array
)
SELECT
exploded_sub_array.createddate,
exploded_sub_array.deletedat,
exploded_sub_array.forecastarea_id,
exploded_sub_array.forecastarea_name,
exploded_sub_array.forecastarea_subscriptionlevel,
exploded_sub_array.forecastarea_designcapacity,
exploded_sub_array.id,
exploded_sub_array.isborderpoint,
exploded_sub_array.lastupdated,
exploded_sub_array.maximolocation,
exploded_sub_array.name,
exploded_sub_array.netbasid,
exploded_sub_array.netbasobjecttype,
exploded_sub_array.substationid,
exploded_sub_array.stationvoltagelevels_id,
exploded_sub_array.stationvoltagelevels_stationid,
exploded_sub_array.stationvoltagelevels_name,
exploded_sub_array.stationvoltagelevels_voltage,
exploded_sub_array.stationvoltagelevels_startvalue,
exploded_sub_array.stationvoltagelevels_designcapacity,
exploded_sub_array.stationvoltagelevels_subscriptionlevel,
exploded_sub_array.stationvoltagelevels_createddate,
exploded_sub_array.stationvoltagelevels_deletedat,
stationvoltagelevelcapacityupgrades.id AS stationvoltagelevelcapacityupgrades_id,
stationvoltagelevelcapacityupgrades.stationvoltagelevelid AS stationvoltagelevelcapacityupgrades_stationvoltagelevelid,
stationvoltagelevelcapacityupgrades.designcapacity AS stationvoltagelevelcapacityupgrades_designcapacity,
stationvoltagelevelcapacityupgrades.year AS stationvoltagelevelcapacityupgrades_year,
stationvoltagelevelcapacityupgrades.pcsid AS stationvoltagelevelcapacityupgrades_pcsid,
stationvoltagelevelcapacityupgrades.createddate AS stationvoltagelevelcapacityupgrades_createddate,
stationvoltagelevelcapacityupgrades.deletedat AS stationvoltagelevelcapacityupgrades_deletedat,
exploded_sub_array.leap_delta_timestamp,
exploded_sub_array.leap_raw_timestamp,
exploded_sub_array.leap_raw_file,
exploded_sub_array.leap_updated_at
FROM exploded_sub_array
Snapshot
snapshots:
- name: bronze_snapshot_station
relation: ref('bronze_ephemeral_station')
config:
schema: kappa_kappa
unique_key: [id, stationvoltagelevels_id, stationvoltagelevelcapacityupgrades_year]
strategy: timestamp
updated_at: lastupdated
The temp view generated by dbt
/* {"app": "dbt", "dbt_version": "1.9.0", "dbt_databricks_version": "1.9.0", "databricks_sql_connector_version": "3.6.0", "profile_name": "leap", "target_name": "operations", "node_id": "snapshot.leap.bronze_snapshot_station"} */
create
or replace view `dev_bronze`.`kappa_kappa`.`snapshot_station__dbt_tmp` as with snapshot_query as (
with __dbt__cte__ephemeral_station as (
WITH station AS (
SELECT
*
FROM
`dev_delta`.`kappa_kappa`.`station` QUALIFY ROW_NUMBER() OVER (
PARTITION BY id
ORDER BY
to_timestamp(lastupdated) DESC
) = 1
),
exploded_array AS (
SELECT
station.createddate,
station.deletedat,
forecastarea.id AS forecastarea_id,
forecastarea.name AS forecastarea_name,
forecastarea.subscriptionlevel AS forecastarea_subscriptionlevel,
forecastarea.designcapacity AS forecastarea_designcapacity,
station.id,
station.isborderpoint,
station.lastupdated,
station.maximolocation,
station.name,
station.netbasid,
station.netbasobjecttype,
station.substationid,
station.leap_delta_timestamp,
station.leap_raw_timestamp,
station.leap_raw_file,
NOW() AS leap_updated_at,
EXPLODE(stationvoltagelevels) AS stationvoltagelevels
FROM
station
),
exploded_sub_array AS (
SELECT
exploded_array.createddate,
exploded_array.deletedat,
exploded_array.forecastarea_id,
exploded_array.forecastarea_name,
exploded_array.forecastarea_subscriptionlevel,
exploded_array.forecastarea_designcapacity,
exploded_array.id,
exploded_array.isborderpoint,
exploded_array.lastupdated,
exploded_array.maximolocation,
exploded_array.name,
exploded_array.netbasid,
exploded_array.netbasobjecttype,
exploded_array.substationid,
exploded_array.leap_delta_timestamp,
exploded_array.leap_raw_timestamp,
exploded_array.leap_raw_file,
exploded_array.leap_updated_at,
stationvoltagelevels.id AS stationvoltagelevels_id,
stationvoltagelevels.stationid AS stationvoltagelevels_stationid,
stationvoltagelevels.name AS stationvoltagelevels_name,
stationvoltagelevels.voltage AS stationvoltagelevels_voltage,
stationvoltagelevels.startvalue AS stationvoltagelevels_startvalue,
stationvoltagelevels.designcapacity AS stationvoltagelevels_designcapacity,
stationvoltagelevels.subscriptionlevel AS stationvoltagelevels_subscriptionlevel,
stationvoltagelevels.createddate AS stationvoltagelevels_createddate,
stationvoltagelevels.deletedat AS stationvoltagelevels_deletedat,
EXPLODE(
stationvoltagelevels.stationvoltagelevelcapacityupgrades
) AS stationvoltagelevelcapacityupgrades
FROM
exploded_array
)
SELECT
exploded_sub_array.createddate,
exploded_sub_array.deletedat,
exploded_sub_array.forecastarea_id,
exploded_sub_array.forecastarea_name,
exploded_sub_array.forecastarea_subscriptionlevel,
exploded_sub_array.forecastarea_designcapacity,
exploded_sub_array.id,
exploded_sub_array.isborderpoint,
exploded_sub_array.lastupdated,
exploded_sub_array.maximolocation,
exploded_sub_array.name,
exploded_sub_array.netbasid,
exploded_sub_array.netbasobjecttype,
exploded_sub_array.substationid,
exploded_sub_array.stationvoltagelevels_id,
exploded_sub_array.stationvoltagelevels_stationid,
exploded_sub_array.stationvoltagelevels_name,
exploded_sub_array.stationvoltagelevels_voltage,
exploded_sub_array.stationvoltagelevels_startvalue,
exploded_sub_array.stationvoltagelevels_designcapacity,
exploded_sub_array.stationvoltagelevels_subscriptionlevel,
exploded_sub_array.stationvoltagelevels_createddate,
exploded_sub_array.stationvoltagelevels_deletedat,
stationvoltagelevelcapacityupgrades.id AS stationvoltagelevelcapacityupgrades_id,
stationvoltagelevelcapacityupgrades.stationvoltagelevelid AS stationvoltagelevelcapacityupgrades_stationvoltagelevelid,
stationvoltagelevelcapacityupgrades.designcapacity AS stationvoltagelevelcapacityupgrades_designcapacity,
stationvoltagelevelcapacityupgrades.year AS stationvoltagelevelcapacityupgrades_year,
stationvoltagelevelcapacityupgrades.pcsid AS stationvoltagelevelcapacityupgrades_pcsid,
stationvoltagelevelcapacityupgrades.createddate AS stationvoltagelevelcapacityupgrades_createddate,
stationvoltagelevelcapacityupgrades.deletedat AS stationvoltagelevelcapacityupgrades_deletedat,
exploded_sub_array.leap_delta_timestamp,
exploded_sub_array.leap_raw_timestamp,
exploded_sub_array.leap_raw_file,
exploded_sub_array.leap_updated_at
FROM
exploded_sub_array
)
select
*
from
__dbt__cte__ephemeral_station
),
snapshotted_data as (
select
*,
id as dbt_unique_key_1,
stationvoltagelevels_id as dbt_unique_key_2,
stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3
from
`dev_bronze`.`kappa_kappa`.`snapshot_station`
where
dbt_valid_to is null
),
insertions_source_data as (
select
*,
id as dbt_unique_key_1,
stationvoltagelevels_id as dbt_unique_key_2,
stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3,
lastupdated as dbt_updated_at,
lastupdated as dbt_valid_from,
coalesce(nullif(lastupdated, lastupdated), null) as dbt_valid_to,
md5(
coalesce(cast(id as string), '') || '|' || coalesce(cast(stationvoltagelevels_id as string), '') || '|' || coalesce(
cast(
stationvoltagelevelcapacityupgrades_year as string
),
''
) || '|' || coalesce(cast(lastupdated as string), '')
) as dbt_scd_id
from
snapshot_query
),
updates_source_data as (
select
*,
id as dbt_unique_key_1,
stationvoltagelevels_id as dbt_unique_key_2,
stationvoltagelevelcapacityupgrades_year as dbt_unique_key_3,
lastupdated as dbt_updated_at,
lastupdated as dbt_valid_from,
lastupdated as dbt_valid_to
from
snapshot_query
),
insertions as (
select
'insert' as dbt_change_type,
source_data.*
from
insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key_1 = source_data.dbt_unique_key_1 -- This is where Databricks highlights the error
and snapshotted_data.dbt_unique_key_2 = source_data.dbt_unique_key_2
and snapshotted_data.dbt_unique_key_3 = source_data.dbt_unique_key_3
where
snapshotted_data.dbt_unique_key_1 is null
or (
snapshotted_data.dbt_unique_key_1 is not null
and (
(
snapshotted_data.dbt_valid_from < source_data.lastupdated
)
)
)
),
updates as (
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id
from
updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key_1 = source_data.dbt_unique_key_1
and snapshotted_data.dbt_unique_key_2 = source_data.dbt_unique_key_2
and snapshotted_data.dbt_unique_key_3 = source_data.dbt_unique_key_3
where
(
(
snapshotted_data.dbt_valid_from < source_data.lastupdated
)
)
)
select
*
from
insertions
union all
select
*
from
updates
The error message:
In the code above I have commented the line where Databricks finds the error.
[AMBIGUOUS_REFERENCE] Reference snapshotted_data.dbt_unique_key_1 is ambiguous, could be: [snapshotted_data.dbt_unique_key_1, snapshotted_data.dbt_unique_key_1]. SQLSTATE: 42704; line 166, pos 40
Workaround
In the meantime we will have to concatenate the unique columns in the ephemeral model and use that column as the sole unique key.