What is the best way to use the Cosmos library with tags given this architecture?

Here at my workplace, we use dbt to manipulate and transform data for availability in a lake. From the beginning, we have organized our workspace to be as standardized as possible, making it intuitive for anyone who joins our team or uses our models for insights. Therefore, we have organized into three tiers:

  • Bronze
    • Staging
    • Production
  • Silver
    • Staging
    • Production
  • Gold

With this separation, we aim to integrate the models and make decisions based on their semantic layer:

  • Bronze: Data consumed from the landing, where we process it to standardize the return of information as we use in our bank.
  • Silver: After the data is processed, we apply the central business rules of our business, where every model needs to be applicable.
  • Gold: Where we generate models for insights, such as datamarts, facts, and dimensions.

In addition, in the bronze and silver layers, we have a data preparation layer, where we perform an ephemeral materialization (ephemeral being a ‘virtual’ table loaded only in memory, not persisted in the database) and a final layer, which involves incremental materialization. Thus, our file tree is arranged as follows:

│ ├── staging
│ │ ├── stg_model_example.sql
│ ├── production 
│ │ └── model_example.sql

From this organization, we tag each folder in the dbt_project for better separation and understanding of the files.

The idea of tagging is to ensure that our data remains organized and that we can identify its dependencies more clearly and easily, through commands like ‘dbt ls’, for example.

However, we also strive to keep our database complete and up-to-date, and, having many models, we seek an approach that not only serves our business but also helps us avoid any model being forgotten and not processed daily.

Therefore, we use Airflow as a tool for orchestrating data workflows, building a pipeline that processes our models daily. In it, we apply the Cosmos library, which aims to simplify the execution of dbt projects with Airflow. With Cosmos, it is possible to run a dbt project through a group of Airflow tasks, which are automatically recognized. Each dbt model becomes a task/group in Airflow, performing transformations and tests. With this in mind, when we manage our Airflow DAG using the Cosmos library, we configure it for processing using the task methodology, where we include a task, and it runs all the models that have this task in common. If we have any model that is no longer used but is still maintained in our project for storage reasons, we maintain a specific tag of ‘deprecated’ and list it in the ‘exclude_tag’ as shown below:

INCLUDE_TABLES = ["tag:snapshots"] EXCLUDE_TABLES = ["tag:deprecated"] .... render_config=RenderConfig(load_method=LoadMode.DBT_LS, select=INCLUDE_TABLES, exclude=EXCLUDE_TABLES)

With this type of configuration, our DAG will only run models that have this tag and ignore those with the deprecated tag. And if one day I want this deprecated model to be processed again, we remove the ‘exclude_tables’ and leave only the include, because the model that has an exclude tag will have an include tag by default.

What happens is that, in the bronze and silver layers, one depends on the other according to the lineage:

[stg_bronze] — [bronze] ----------[stg_silver] -----------[silver] ├── tag: stg, sales ├── tag: sales ├── tag: stg, sales ├── tag: sales

When we give the command ‘dbt ls -s tag: silver’, it returns:

stg_bronze bronze stg_silver silver

However, when processing this lineage, what is happening is that Cosmos is not smart enough not to process the same model twice. For example, the stg_silver file is ephemeral that invokes bronze and processes. However, when we run the official silver, it calls the stg_silver and processes the same model again that has already been processed previously.

If we apply:

INCLUDE_TABLES = ["tag:sales"] EXCLUDE_TABLES = ["tag:stg"] .... render_config=RenderConfig(load_method=LoadMode.DBT_LS, select=INCLUDE_TABLES, exclude=EXCLUDE_TABLES)

The lineage looks like this:

[bronze] ├── tag: sales [silver] ├── tag: sales

And what is the problem with this? There is no respect for the hierarchy of models, i.e., the models will be processed in parallel, which is not right, as the silver model needs to be updated requires the hierarchy to be respected, and the bronze processed first. However, if we leave the stg out of the exclude, there is a reprocessing of each model twice, which is generating a lot of processing time.

I would like to know if there is any way to map the staging so that they only process when the main model is called without losing the lineage, or if the best solution would be to change the way we organize our project not to create more stg and production models. Am I using the concept of staging incorrectly? Because even though it is a ‘virtual’ table, I still use its data loaded in memory to save in an incremental table of my DW.

Our example model:

{{ config(materialized='ephemeral') }}
SELECT * FROM {{ source('public', 'sales') }}
{{ config(materialized='incremental') }}
SELECT product_id FROM {{ ref('stg_sales') }}
{% if is_incremental() %}
WHERE date_atualizacao > (SELECT max(date_atualizacao) FROM {{ this }})
{% endif %}