Clean your warehouse of old and deprecated models

dbt models persist in your production database even after they’re deleted from your project. This adds clutter to the warehouse and can slow down operations like database or schema cloning, which are used frequently in CI/CD for snowflake dbt projects.

I wrote a macro that executes DROP statements for all the tables and views in your target schema that don’t exist in your project. How does it work? First, it fetches all the models, seeds, and snapshots in your project. Then, it queries the information schema for all the models in your target schema excluding the ones fetched above. Finally, it formats (and then executes) DROP statements for each table/view that isn’t in your project.

To invoke from the command line, use: dbt run-operation drop_old_relations

Warning: This will delete everything in your target schema that isn’t in your dbt project.

Warning: This does not cover aliases

I strongly recommend running dbt run-operation drop_old_relations --args '{"dryrun": True}' first to see the DROP commands

Thanks a lot to Randy of Hashmap for providing much of the code! You can find his original version here, which drops models based on the last_altered date in the info schema using a pre-set cutoff

{% macro drop_old_relations(dryrun=False) %}

{% if execute %}
  {% set current_models=[] %}

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}

  {% endfor %}
{% endif %}

{% set cleanup_query %}

      WITH MODELS_TO_DROP AS (
        SELECT
          CASE 
            WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
            WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
          END AS RELATION_TYPE,
          CONCAT_WS('.', TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME) AS RELATION_NAME
        FROM 
          {{ target.database }}.INFORMATION_SCHEMA.TABLES
        WHERE TABLE_SCHEMA = '{{ target.schema }}'
          AND TABLE_NAME NOT IN
            ({%- for model in current_models -%}
                '{{ model.upper() }}'
                {%- if not loop.last -%}
                    ,
                {% endif %}
            {%- endfor -%}))
      SELECT 
        'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
      FROM 
        MODELS_TO_DROP

  {% endset %}

{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}

{% if drop_commands %}
  {% if dryrun | as_bool == False %}
    {% do log('Executing DROP commands...', True) %}
  {% else %}
    {% do log('Printing DROP commands...', True) %}
  {% endif %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dryrun | as_bool == False %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}

{%- endmacro -%}

EDIT: Added dryrun feature

7 Likes

I love this! I modified the script slightly. The modified version below:

  1. Provides the option of a dry run where the commands are printed, but not executed
  2. Will match any schema prefixed with the target schema, rather than only the exact target schema (since we use multiple schemas of the form <target_schema>_<schema_name>). The exact target schema will still be matched. This introduces some risk if target schemas could overlap with prefixes of other schema names, so use with caution. It only matches strictly prefixes though, not a full wildcard, so if you’re consistent with prefixing schemas this shouldn’t be an issue.
  3. Is clear about excluding non-standard table types (e.g. for us, we don’t want this to delete our external tables, which are managed slightly outside of these parameters through dbt).

Modified version:

{% macro drop_old_relations(dry_run='false') %}
{% if execute %}
  {% set current_models=[] %}
  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}
  {% endfor %}
{% endif %}
{% set cleanup_query %}
      with models_to_drop as (
        select
          case 
            when table_type = 'BASE TABLE' then 'TABLE'
            when table_type = 'VIEW' then 'VIEW'
          end as relation_type,
          concat_ws('.', table_catalog, table_schema, table_name) as relation_name
        from 
          {{ target.database }}.information_schema.tables
        where table_schema ilike '{{ target.schema }}%'
          and table_name not in
            ({%- for model in current_models -%}
                '{{ model.upper() }}'
                {%- if not loop.last -%}
                    ,
                {% endif %}
            {%- endfor -%}))
      select 
        'drop ' || relation_type || ' ' || relation_name || ';' as drop_commands
      from 
        models_to_drop
      
      -- intentionally exclude unhandled table_types, including 'external table`
      where drop_commands is not null
  {% endset %}
{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}
{% if drop_commands %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dry_run == 'false' %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}
{%- endmacro -%}
2 Likes

Thanks for sharing! Have you considered how to extend this to account for aliases?

What do you mean by aliases?

Where the table created in the db is different than the model name.

Why alias model names?#

The names of schemas and tables are effectively the “user interface” of your data warehouse. Well-named schemas and tables can help provide clarity and direction for consumers of this data. In combination with custom schemas, model aliasing is a powerful mechanism for designing your warehouse.

interesting, i didn’t know about that feature. you could probably extend this by grabbing the alias name in the initial loop of the dbt graph. something like this maybe?

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% if node.alias %}
    {% do current_models.append(node.alias) %}
    {% else %}
    {% do current_models.append(node.name) %}
    {% endif %}

Here’s my take with support for multiple databases and aliases:

{% macro drop_old_tables(dry_run='false') %}
    {% if execute %}
        {% set current_model_locations={} %}
        {% for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
            {% if not node.database in current_model_locations %}
                {% do current_model_locations.update({node.database: {}}) %}
            {% endif %}
            {% if not node.schema.upper() in current_model_locations[node.database] %}
                {% do current_model_locations[node.database].update({node.schema.upper(): []}) %}
            {% endif %}
            {% set table_name = node.alias if node.alias else node.name %}
            {% do current_model_locations[node.database][node.schema.upper()].append(table_name.upper()) %}
        {% endfor %}
    {% endif %}
    {% set cleanup_query %}

        with models_to_drop as (
            {% for database in current_model_locations.keys() %}
            {% if loop.index > 1 %}
            union all
            {% endif %}
            select
                table_type
                , table_catalog
                , table_schema
                , table_name
                , case
                    when table_type = 'BASE TABLE' then 'TABLE'
                    when table_type = 'VIEW' then 'VIEW'
                end as relation_type
                , concat_ws('.', table_catalog, table_schema, table_name) as relation_name
            from {{ database }}.information_schema.tables
            where table_schema in ('{{ "', '".join(current_model_locations[database].keys()) }}')
            and not (
                {% for schema in current_model_locations[database].keys() %}
                {% if loop.index > 1 %}or {% endif %}table_schema = '{{ schema }}' and table_name in ('{{ "', '".join(current_model_locations[database][schema]) }}')
                {% endfor %}
            )
            {% endfor %}
        )

        select 'drop ' || relation_type || ' ' || relation_name || ';' as drop_commands
        from models_to_drop
        -- intentionally exclude unhandled table_types, including 'external table`
        where drop_commands is not null

    {% endset %}
    {% do log(cleanup_query, info=True) %}
    {% set drop_commands = run_query(cleanup_query).columns[0].values() %}
    {% if drop_commands %}
        {% for drop_command in drop_commands %}
            {% do log(drop_command, True) %}
            {% if dry_run == 'false' %}
                {% do run_query(drop_command) %}
            {% endif %}
        {% endfor %}
    {% else %}
        {% do log('No relations to clean.', True) %}
    {% endif %}
{%- endmacro -%}

Building on the solution from @NiallRees . Here’s my take for BigQuery:

To execute a dry run use: dbt run-operation cleanup_dataset --args '{"dry_run": True}'

-- Removes tables and views from the given run configuration
-- Usage in production:
--    dbt run-operation cleanup_dataset
-- To only see the commands that it is about to perform:
--    dbt run-operation cleanup_dataset --args '{"dry_run": True}'
{% macro cleanup_dataset(dry_run=False) %}
    {% if execute %}
        {% set current_model_locations={} %}

        {% for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
            {% if not node.database in current_model_locations %}
                {% do current_model_locations.update({node.database: {}}) %}
            {% endif %}
            {% if not node.schema in current_model_locations[node.database] %}
                {% do current_model_locations[node.database].update({node.schema: []}) %}
            {% endif %}
            {% set table_name = node.alias if node.alias else node.name %}
            {% do current_model_locations[node.database][node.schema].append(table_name) %}
        {% endfor %}
    {% endif %}

    {% set cleanup_query %}

        with models_to_drop as (
            {% for database in current_model_locations.keys() %}
                {% if loop.index > 1 %}union all{% endif %}
                {% for dataset, tables  in current_model_locations[database].items() %}
                    {% if loop.index > 1 %}union all{% endif %}
                    select
                        table_type,
                        table_catalog,
                        table_schema,
                        table_name,
                        case
                            when table_type = 'BASE TABLE' then 'TABLE'
                            when table_type = 'VIEW' then 'VIEW'
                        end as relation_type,
                        array_to_string([table_catalog, table_schema, table_name], '.') as relation_name
                    from {{ dataset }}.INFORMATION_SCHEMA.TABLES
                    where not (table_name in ('{{ "', '".join(tables) }}'))
                {% endfor %}
            {% endfor %}
        ),
        drop_commands as (
            select 'drop ' || relation_type || ' `' || relation_name || '`;' as command
            from models_to_drop
        )

        select command
        from drop_commands
        -- intentionally exclude unhandled table_types, including 'external table`
        where command is not null

    {% endset %}
    {% set drop_commands = run_query(cleanup_query).columns[0].values() %}
    {% if drop_commands %}
        {% for drop_command in drop_commands %}
            {% do log(drop_command, True) %}
            {% if dry_run | as_bool == False %}
                {% do run_query(drop_command) %}
            {% endif %}
        {% endfor %}
    {% else %}
        {% do log('No relations to clean.', True) %}
    {% endif %}
{%- endmacro -%}

On Redshift, the drop query wasn’t throwing an error but it also wasn’t dropping the relations. Once I wrapped the drop statement(s) in a begin/commit, it worked for me.

Sharing that section of my code FWIW.

...
{% set drop_query = ['begin;'] %}
{%- for item in to_delete %}
  {%- set drop_statement %}  drop {{ item[2] }} if exists "{{ item[0] }}"."{{ item[1] }}" cascade;{%- endset %}
  {%- do drop_query.append(drop_statement) %}
{%- endfor %} 
{%- do drop_query.append('commit;') %}
{%- set drop_query = drop_query|join('\n') %}
{%- do log(
    modules.datetime.datetime.now().strftime('%H:%M:%S') 
    ~ ' | Executing the following statements:',
    info=true) %}
{%- do log(drop_query, info=true) %}
{%- do run_query(drop_query) %}
...
Running with dbt=0.21.0
22:39:23 | Finding vestigial tables
22:39:24 | Executing the following statements:
begin;
  drop view if exists "schema"."vestigial_table_1" cascade;
  drop view if exists "another_schema"."vestigial_table_2" cascade;
commit;

works perfect thanks.
One thing I have my development models in a different BQ project than my tables in production. So it looks this macro only works for me in my development project.
How can I change the macro so it will work on my Production project as well when I run this macro in a job.
All my code is now ‘transferred’ to the right project via a macro: generate_database_name(), see code below.
I think I need to call this macro somewhere from your macro to make it happen, but I don’t know how.

{% macro generate_database_name(custom_database_name=none, node=none) -%}

{{ log("Running generate database name macro") }}
{% set default_database = target.database %}
{% set production_database = 'analytics-prod' %}

{% if custom_database_name %}


    {% if target.name == "prod" %}

        {{ production_database }}

    {% else %}

        {{ custom_database_name }}

    {% endif %}
    

{% else %}

    {{ default_database }}

{% endif %}

{% endmacro %}

Awesome stuff, thanks for putting this out here!

We made one small tweak to the conditional logic to enable support for dropping BigQuery external table definitions in addition to views and tables:

SELECT
          CASE 
            WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
            WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
            WHEN TABLE_TYPE = 'EXTERNAL' THEN 'EXTERNAL TABLE'
          END AS RELATION_TYPE,

I created another macro that’s helpful for debugging this drop_old_relations macro. Prints the nodes (models, seeds, snapshots) in your DBT graph:

{% macro print_graph() %}

    {% if execute %}
        {% set query %}

            {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                            + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list
                            + graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot")  | list %}
                SELECT
                '{{node.config.schema}}' AS schema_name
                , '{{node.name}}' AS ref_name
                , '{{node.alias}}' AS alias
                , '{{node.resource_type}}' AS resource_type
                {% if not loop.last %} UNION ALL {% endif %}
            {%- endfor %}
        {% endset %}

        {%- set result = run_query(query) -%}
        {% if result %}
            {%- for node in result -%}
                {% do log(node, True) %}
            {%- endfor -%}
        {% endif %}
    {% endif %}

{% endmacro %}

Runnable with dbt run-operation print_graph