Incremental loads of unioned sources

Question about this… My situation is we have 3 sources synced at different times but I’m creating an events table that runs dbt.union_relations to union all of these related tables. If I know the events that are synced at different times, is it possible to account for this by running something like this?

{{ config(schema='website_events', materialized='incremental', full_refresh=false) }}

with cte as (

  {{
    dbt_utils.union_relations(
      relations=[
          ref('table_one'),
          ref('table_two'),
          ref('table_three')
      include=[
        'timestamp',
        'event',
        'anonymous_id'
        ]
    )
  }}
)

-- website
select
  timestamp,
  event,
  anonymous_id

from
  cte

where
  event not in ('event_one', 'event_two')

  {% if is_incremental() %}
    and timestamp > (
      select max(timestamp) from {{ this }}
      where event not in ('event_one', 'event_two')
    )
  {% endif %}

union all

-- server event
select
  timestamp,
  event,
  anonymous_id

from
  cte

where
  event in ('event_one', 'event_two')

  {% if is_incremental() %}
    and timestamp > (
      select max(timestamp) from {{ this }}
      where event in ('event_one', 'event_two')
    )
  {% endif %}

order by 1
1 Like

I had the same need, I would like to union tables coming from queries, but unfortunately union_relations only allow for relations indeed.

To solve this, for my knowledge, the only way is to manually write the SQL incremental queries to load the tables and make them have the same structure and order, and then use UNION ALL to reach your result.
In your case should not be difficult since you just have 3 columns to include, so it should be pretty fast and easy.

Something like:

with table_one as (
    select 'timestamp', 'event', 'anonymous_id'
    from ref('table_one ')
    {% if is_incremental() %}
    where timestamp>= {{ dbt_date.n_days_ago(n=90) }}
    {% endif %}
),
table_two as (
    select 'timestamp', 'event', 'anonymous_id'
    from ref('table_one ')
    {% if is_incremental() %}
    where timestamp>= {{ dbt_date.n_days_ago(n=90) }}
    {% endif %}
),
table_three as (
    select 'timestamp', 'event', 'anonymous_id'
    from ref('table_three ')
    {% if is_incremental() %}
    where timestamp>= {{ dbt_date.n_days_ago(n=90) }}
    {% endif %}
)

select * from table_one 
union all
select * from table_two
union all
select * from table_three

Set default values accordingly if you don’t have that column in the source query, like with a NULL value.
In case instead you have many tables and columns it could be pretty annoying.
Cheers