Clean your warehouse of old and deprecated models

dbt models persist in your production database even after they’re deleted from your project. This adds clutter to the warehouse and can slow down operations like database or schema cloning, which are used frequently in CI/CD for snowflake dbt projects.

I wrote a macro that executes DROP statements for all the tables and views in your target schema that don’t exist in your project. How does it work? First, it fetches all the models, seeds, and snapshots in your project. Then, it queries the information schema for all the models in your target schema excluding the ones fetched above. Finally, it formats (and then executes) DROP statements for each table/view that isn’t in your project.

To invoke from the command line, use: dbt run-operation drop_old_relations

Warning: This will delete everything in your target schema that isn’t in your dbt project.

Warning: This does not cover aliases

I strongly recommend running dbt run-operation drop_old_relations --args '{"dryrun": True}' first to see the DROP commands

Thanks a lot to Randy of Hashmap for providing much of the code! You can find his original version here, which drops models based on the last_altered date in the info schema using a pre-set cutoff

{% macro drop_old_relations(dryrun=False) %}

{% if execute %}
  {% set current_models=[] %}

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}

  {% endfor %}
{% endif %}

{% set cleanup_query %}

      WITH MODELS_TO_DROP AS (
        SELECT
          CASE 
            WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
            WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
          END AS RELATION_TYPE,
          CONCAT_WS('.', TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME) AS RELATION_NAME
        FROM 
          {{ target.database }}.INFORMATION_SCHEMA.TABLES
        WHERE TABLE_SCHEMA = '{{ target.schema }}'
          AND TABLE_NAME NOT IN
            ({%- for model in current_models -%}
                '{{ model.upper() }}'
                {%- if not loop.last -%}
                    ,
                {% endif %}
            {%- endfor -%}))
      SELECT 
        'DROP ' || RELATION_TYPE || ' ' || RELATION_NAME || ';' as DROP_COMMANDS
      FROM 
        MODELS_TO_DROP

  {% endset %}

{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}

{% if drop_commands %}
  {% if dryrun | as_bool == False %}
    {% do log('Executing DROP commands...', True) %}
  {% else %}
    {% do log('Printing DROP commands...', True) %}
  {% endif %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dryrun | as_bool == False %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}

{%- endmacro -%}

EDIT: Added dryrun feature

10 Likes

I love this! I modified the script slightly. The modified version below:

  1. Provides the option of a dry run where the commands are printed, but not executed
  2. Will match any schema prefixed with the target schema, rather than only the exact target schema (since we use multiple schemas of the form <target_schema>_<schema_name>). The exact target schema will still be matched. This introduces some risk if target schemas could overlap with prefixes of other schema names, so use with caution. It only matches strictly prefixes though, not a full wildcard, so if you’re consistent with prefixing schemas this shouldn’t be an issue.
  3. Is clear about excluding non-standard table types (e.g. for us, we don’t want this to delete our external tables, which are managed slightly outside of these parameters through dbt).

Modified version:

{% macro drop_old_relations(dry_run='false') %}
{% if execute %}
  {% set current_models=[] %}
  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% do current_models.append(node.name) %}
  {% endfor %}
{% endif %}
{% set cleanup_query %}
      with models_to_drop as (
        select
          case 
            when table_type = 'BASE TABLE' then 'TABLE'
            when table_type = 'VIEW' then 'VIEW'
          end as relation_type,
          concat_ws('.', table_catalog, table_schema, table_name) as relation_name
        from 
          {{ target.database }}.information_schema.tables
        where table_schema ilike '{{ target.schema }}%'
          and table_name not in
            ({%- for model in current_models -%}
                '{{ model.upper() }}'
                {%- if not loop.last -%}
                    ,
                {% endif %}
            {%- endfor -%}))
      select 
        'drop ' || relation_type || ' ' || relation_name || ';' as drop_commands
      from 
        models_to_drop
      
      -- intentionally exclude unhandled table_types, including 'external table`
      where drop_commands is not null
  {% endset %}
{% do log(cleanup_query, info=True) %}
{% set drop_commands = run_query(cleanup_query).columns[0].values() %}
{% if drop_commands %}
  {% for drop_command in drop_commands %}
    {% do log(drop_command, True) %}
    {% if dry_run == 'false' %}
      {% do run_query(drop_command) %}
    {% endif %}
  {% endfor %}
{% else %}
  {% do log('No relations to clean.', True) %}
{% endif %}
{%- endmacro -%}
3 Likes

Thanks for sharing! Have you considered how to extend this to account for aliases?

What do you mean by aliases?

Where the table created in the db is different than the model name.

Why alias model names?#

The names of schemas and tables are effectively the “user interface” of your data warehouse. Well-named schemas and tables can help provide clarity and direction for consumers of this data. In combination with custom schemas, model aliasing is a powerful mechanism for designing your warehouse.

interesting, i didn’t know about that feature. you could probably extend this by grabbing the alias name in the initial loop of the dbt graph. something like this maybe?

  {% for node in graph.nodes.values()
     | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
    {% if node.alias %}
    {% do current_models.append(node.alias) %}
    {% else %}
    {% do current_models.append(node.name) %}
    {% endif %}

Here’s my take with support for multiple databases and aliases:

{% macro drop_old_tables(dry_run='false') %}
    {% if execute %}
        {% set current_model_locations={} %}
        {% for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
            {% if not node.database in current_model_locations %}
                {% do current_model_locations.update({node.database: {}}) %}
            {% endif %}
            {% if not node.schema.upper() in current_model_locations[node.database] %}
                {% do current_model_locations[node.database].update({node.schema.upper(): []}) %}
            {% endif %}
            {% set table_name = node.alias if node.alias else node.name %}
            {% do current_model_locations[node.database][node.schema.upper()].append(table_name.upper()) %}
        {% endfor %}
    {% endif %}
    {% set cleanup_query %}

        with models_to_drop as (
            {% for database in current_model_locations.keys() %}
            {% if loop.index > 1 %}
            union all
            {% endif %}
            select
                table_type
                , table_catalog
                , table_schema
                , table_name
                , case
                    when table_type = 'BASE TABLE' then 'TABLE'
                    when table_type = 'VIEW' then 'VIEW'
                end as relation_type
                , concat_ws('.', table_catalog, table_schema, table_name) as relation_name
            from {{ database }}.information_schema.tables
            where table_schema in ('{{ "', '".join(current_model_locations[database].keys()) }}')
            and not (
                {% for schema in current_model_locations[database].keys() %}
                {% if loop.index > 1 %}or {% endif %}table_schema = '{{ schema }}' and table_name in ('{{ "', '".join(current_model_locations[database][schema]) }}')
                {% endfor %}
            )
            {% endfor %}
        )

        select 'drop ' || relation_type || ' ' || relation_name || ';' as drop_commands
        from models_to_drop
        -- intentionally exclude unhandled table_types, including 'external table`
        where drop_commands is not null

    {% endset %}
    {% do log(cleanup_query, info=True) %}
    {% set drop_commands = run_query(cleanup_query).columns[0].values() %}
    {% if drop_commands %}
        {% for drop_command in drop_commands %}
            {% do log(drop_command, True) %}
            {% if dry_run == 'false' %}
                {% do run_query(drop_command) %}
            {% endif %}
        {% endfor %}
    {% else %}
        {% do log('No relations to clean.', True) %}
    {% endif %}
{%- endmacro -%}

1 Like

Building on the solution from @NiallRees . Here’s my take for BigQuery:

To execute a dry run use: dbt run-operation cleanup_dataset --args '{"dry_run": True}'

-- Removes tables and views from the given run configuration
-- Usage in production:
--    dbt run-operation cleanup_dataset
-- To only see the commands that it is about to perform:
--    dbt run-operation cleanup_dataset --args '{"dry_run": True}'
{% macro cleanup_dataset(dry_run=False) %}
    {% if execute %}
        {% set current_model_locations={} %}

        {% for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"])%}
            {% if not node.database in current_model_locations %}
                {% do current_model_locations.update({node.database: {}}) %}
            {% endif %}
            {% if not node.schema in current_model_locations[node.database] %}
                {% do current_model_locations[node.database].update({node.schema: []}) %}
            {% endif %}
            {% set table_name = node.alias if node.alias else node.name %}
            {% do current_model_locations[node.database][node.schema].append(table_name) %}
        {% endfor %}
    {% endif %}

    {% set cleanup_query %}

        with models_to_drop as (
            {% for database in current_model_locations.keys() %}
                {% if loop.index > 1 %}union all{% endif %}
                {% for dataset, tables  in current_model_locations[database].items() %}
                    {% if loop.index > 1 %}union all{% endif %}
                    select
                        table_type,
                        table_catalog,
                        table_schema,
                        table_name,
                        case
                            when table_type = 'BASE TABLE' then 'TABLE'
                            when table_type = 'VIEW' then 'VIEW'
                        end as relation_type,
                        array_to_string([table_catalog, table_schema, table_name], '.') as relation_name
                    from {{ dataset }}.INFORMATION_SCHEMA.TABLES
                    where not (table_name in ('{{ "', '".join(tables) }}'))
                {% endfor %}
            {% endfor %}
        ),
        drop_commands as (
            select 'drop ' || relation_type || ' `' || relation_name || '`;' as command
            from models_to_drop
        )

        select command
        from drop_commands
        -- intentionally exclude unhandled table_types, including 'external table`
        where command is not null

    {% endset %}
    {% set drop_commands = run_query(cleanup_query).columns[0].values() %}
    {% if drop_commands %}
        {% for drop_command in drop_commands %}
            {% do log(drop_command, True) %}
            {% if dry_run | as_bool == False %}
                {% do run_query(drop_command) %}
            {% endif %}
        {% endfor %}
    {% else %}
        {% do log('No relations to clean.', True) %}
    {% endif %}
{%- endmacro -%}
1 Like

On Redshift, the drop query wasn’t throwing an error but it also wasn’t dropping the relations. Once I wrapped the drop statement(s) in a begin/commit, it worked for me.

Sharing that section of my code FWIW.

...
{% set drop_query = ['begin;'] %}
{%- for item in to_delete %}
  {%- set drop_statement %}  drop {{ item[2] }} if exists "{{ item[0] }}"."{{ item[1] }}" cascade;{%- endset %}
  {%- do drop_query.append(drop_statement) %}
{%- endfor %} 
{%- do drop_query.append('commit;') %}
{%- set drop_query = drop_query|join('\n') %}
{%- do log(
    modules.datetime.datetime.now().strftime('%H:%M:%S') 
    ~ ' | Executing the following statements:',
    info=true) %}
{%- do log(drop_query, info=true) %}
{%- do run_query(drop_query) %}
...
Running with dbt=0.21.0
22:39:23 | Finding vestigial tables
22:39:24 | Executing the following statements:
begin;
  drop view if exists "schema"."vestigial_table_1" cascade;
  drop view if exists "another_schema"."vestigial_table_2" cascade;
commit;

works perfect thanks.
One thing I have my development models in a different BQ project than my tables in production. So it looks this macro only works for me in my development project.
How can I change the macro so it will work on my Production project as well when I run this macro in a job.
All my code is now ‘transferred’ to the right project via a macro: generate_database_name(), see code below.
I think I need to call this macro somewhere from your macro to make it happen, but I don’t know how.

{% macro generate_database_name(custom_database_name=none, node=none) -%}

{{ log("Running generate database name macro") }}
{% set default_database = target.database %}
{% set production_database = 'analytics-prod' %}

{% if custom_database_name %}


    {% if target.name == "prod" %}

        {{ production_database }}

    {% else %}

        {{ custom_database_name }}

    {% endif %}
    

{% else %}

    {{ default_database }}

{% endif %}

{% endmacro %}

Awesome stuff, thanks for putting this out here!

We made one small tweak to the conditional logic to enable support for dropping BigQuery external table definitions in addition to views and tables:

SELECT
          CASE 
            WHEN TABLE_TYPE = 'BASE TABLE' THEN 'TABLE'
            WHEN TABLE_TYPE = 'VIEW' THEN 'VIEW'
            WHEN TABLE_TYPE = 'EXTERNAL' THEN 'EXTERNAL TABLE'
          END AS RELATION_TYPE,

I created another macro that’s helpful for debugging this drop_old_relations macro. Prints the nodes (models, seeds, snapshots) in your DBT graph:

{% macro print_graph() %}

    {% if execute %}
        {% set query %}

            {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                            + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list
                            + graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot")  | list %}
                SELECT
                '{{node.config.schema}}' AS schema_name
                , '{{node.name}}' AS ref_name
                , '{{node.alias}}' AS alias
                , '{{node.resource_type}}' AS resource_type
                {% if not loop.last %} UNION ALL {% endif %}
            {%- endfor %}
        {% endset %}

        {%- set result = run_query(query) -%}
        {% if result %}
            {%- for node in result -%}
                {% do log(node, True) %}
            {%- endfor -%}
        {% endif %}
    {% endif %}

{% endmacro %}

Runnable with dbt run-operation print_graph

1 Like

I just adapted some things, added comments and formatting it a little bit.

Thanks to all previous contributors for making our life easier!


{%- macro drop_orphanate_tables(dry_run='false') -%}

    {%- if execute -%}

        -- Create empty dictionary that will contain the hierarchy of the models in dbt
        {%- set current_model_locations = {} -%}

        -- Insert the hierarchy database.schema.table in the dictionary above
        {%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}

            {%- set database_name = node.database.upper() -%}
            {%- set schema_name = node.schema.upper() -%}
            {%- set table_name = node.alias if node.alias else node.name -%}

            -- Add db name if it does not exist in the dict
            {%- if not database_name in current_model_locations -%}
                {% do current_model_locations.update({database_name: {}}) -%}
            {%- endif -%}

            -- Add schema name if it does not exist in the dict
            {%- if not schema_name in current_model_locations[database_name] -%}
                {% do current_model_locations[database_name].update({schema_name: []}) -%}
            {%- endif -%}

            -- Add the tables for the db and schema selected
            {%- do current_model_locations[database_name][schema_name].append(table_name.upper()) -%}

        {%- endfor -%}

    {%- endif -%}

    -- Query to retrieve the models to drop
    {%- set cleanup_query -%}

        WITH models_to_drop AS (
            {%- for database in current_model_locations.keys() -%}
                {%- if loop.index > 1 %}
                UNION ALL
                {% endif %}

                SELECT
                    CASE
                        WHEN table_type = 'BASE TABLE' THEN 'TABLE'
                        WHEN table_type = 'VIEW' THEN 'VIEW'
                        ELSE NULL
                    END AS relation_type,
                    table_catalog,
                    table_schema,
                    table_name,
                    concat_ws('.', table_catalog, table_schema, table_name) as relation_name
                FROM {{ database }}.information_schema.tables
                WHERE
                    table_schema IN ('{{ "', '".join(current_model_locations[database].keys()) }}')
                    AND NOT (
                        {%- for schema in current_model_locations[database].keys() -%}
                            {% if loop.index > 1 %}
                            OR {% endif %} table_schema = '{{ schema }}' AND table_name IN ('{{ "', '".join(current_model_locations[database][schema]) }}')
                        {%- endfor %}
                    )
            {%- endfor -%}
        )
        -- Create the DROP statments to be executed in the database
        SELECT 'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' ||table_schema || '.' || table_name || ';' AS drop_commands
        FROM models_to_drop
        WHERE relation_type IS NOT NULL

    {%- endset -%}

    -- Execute the DROP statments above

    {%- set drop_commands = run_query(cleanup_query).columns[0].values() -%}

    {%- if drop_commands -%}

        {%- for drop_command in drop_commands -%}

            {%- do log(drop_command, True) -%}

            {%- if dry_run.upper() == 'FALSE' -%}
                {%- do run_query(drop_command) -%}
                {%- do log('Executed', True) -%}
            {%- endif -%}

        {%- endfor -%}

    {%- else -%}

        {%- do log('No relations to clean', True) -%}

    {%- endif -%}

{%- endmacro -%}

2 Likes

building on @rcoll 's version above. Added case-insensitivtity string matching

Thanks to all contributors!

{%- macro drop_orphanate_tables(dry_run='false') -%}

    {%- if execute -%}

        -- Create empty dictionary that will contain the hierarchy of the models in dbt
        {%- set current_model_locations = {} -%}

        -- Insert the hierarchy database.schema.table in the dictionary above
        {%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}

            {%- set database_name = node.database.upper() -%}
            {%- set schema_name = node.schema.upper() -%}
            {%- set table_name = node.alias if node.alias else node.name -%}

            -- Add db name if it does not exist in the dict
            {%- if not database_name in current_model_locations -%}
                {% do current_model_locations.update({database_name: {}}) -%}
            {%- endif -%}

            -- Add schema name if it does not exist in the dict
            {%- if not schema_name in current_model_locations[database_name] -%}
                {% do current_model_locations[database_name].update({schema_name: []}) -%}
            {%- endif -%}

            -- Add the tables for the db and schema selected
            {%- do current_model_locations[database_name][schema_name].append(table_name.upper()) -%}

        {%- endfor -%}

        {{ log(current_model_locations) }}

    {%- endif -%}

    -- Query to retrieve the models to drop
    {%- set cleanup_query -%}

        WITH models_to_drop AS (
            {%- for database in current_model_locations.keys() -%}

                SELECT
                    CASE
                        WHEN table_type = 'BASE TABLE' THEN 'TABLE'
                        WHEN table_type = 'VIEW' THEN 'VIEW'
                        ELSE NULL
                    END AS relation_type,
                    table_catalog,
                    table_schema,
                    table_name,
                    table_catalog || '.' || table_schema || '.' || table_name as relation_name
                FROM {{ database }}.information_schema.tables
                WHERE
                    LOWER(table_schema) IN ('{{ "', '".join(current_model_locations[database].keys())|lower }}')
                    AND NOT (
                        {%- for schema in current_model_locations[database].keys() -%}
                            LOWER(table_schema) = LOWER('{{ schema }}') AND LOWER(table_name) IN ('{{ "', '".join(current_model_locations[database][schema])|lower }}')
                            {% if not loop.last %} OR {% endif %}
                        {%- endfor %}
                    )

                {% if not loop.last -%} UNION ALL {%- endif %}
            {%- endfor -%}
        )
        -- Create the DROP statments to be executed in the database
        SELECT 'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' ||table_schema || '.' || table_name || ';' AS drop_commands
        FROM models_to_drop
        WHERE relation_type IS NOT NULL

    {%- endset -%}

    -- Execute the DROP statments above

    {%- set drop_commands = run_query(cleanup_query).columns[0].values() -%}

    {%- if drop_commands -%}

        {%- for drop_command in drop_commands -%}

            {%- do log(drop_command, True) -%}

            {%- if dry_run.upper() == 'FALSE' -%}
                {%- do run_query(drop_command) -%}
                {%- do log('Executed', True) -%}
            {%- endif -%}

        {%- endfor -%}

    {%- else -%}

        {%- do log('No relations to clean', True) -%}

    {%- endif -%}

{%- endmacro -%}

Execute via:
dbt run-operation drop_orphanate_tables --args '{dry_run: "true"}'
(args is optional, default false, which will drop tables. true will only list tables to drop.)

A colleague and I build @rcoll’s version and configured it to work when combining dbt with Databricks.

We added an optional except and used Jinja instead of SQL to build the drop statements. Furthermore, some logging is added to keep track of what is happening.

Thanks for sharing all of this. It really makes life easier.

{%- macro drop_outdated_tables(dry_run=False, except=[]) -%}

    {%- if execute -%}

    {%- set current_model_locations = {} -%}

        {%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}

            {%- set schema_name = node.schema.upper() -%}
            {%- set table_name = node.alias if node.alias else node.name -%}

            -- Add schema name if it does not exist in the dict
            {%- if not schema_name in current_model_locations -%}
                {% do current_model_locations.update({schema_name: []}) -%}
            {%- endif -%}

            -- Add table name if it does not exist in the dict
            {%- if not table_name in current_model_locations[schema_name] -%}
                {% do current_model_locations[schema_name].append(table_name) -%}
            {%- endif -%}

        {%- endfor -%}

    {%- endif -%}

    -- Query to retrieve the models to drop
    {%- set cleanup_query -%}

        {%- for schema in current_model_locations.keys() -%}

                {% set all_tables_query = "show tables from " + schema %}
                {% set result = run_query(all_tables_query) %}
                {% for row in result %}

                    {% set table = row[1] %}
                    {% set type = [] %}
                    {% set describe_query = "describe extended " + schema + "." + table %}
                    {% set describe_result = run_query(describe_query) %}

                    {% for type_row in describe_result %}
                        {% if type_row[0] == "Type" %}
                            {% do type.append("TABLE" if type_row[1] == "EXTERNAL" else "VIEW") %}
                        {% endif %}
                    {% endfor %}

                    {% if not table in current_model_locations[schema] and not table in except %}
                        {% do log("Dropping " + type[0] + " " + table + " from " + schema, True) %}
                        SELECT 'DROP {{ type[0] }} IF EXISTS {{ schema }}.{{ table }};' as drop_command
                        union
                    {% else %}
                        {% do log(".", True) %}
                    {% endif %}

                {% endfor %}

        {% endfor %}
        SELECT 'SELECT 1;' as dummy_for_union

    {%- endset -%}

    {%- do log(cleanup_query, True) -%}
    {%- do log("Done setting cleanup_query", True) -%}

    -- create list of the DROP statments above
    {%- set drop_commands = run_query(cleanup_query) -%}
    {%- set commands_list = drop_commands.columns[0].values() -%}

    {%- do log("Drop statements turned into list", True) -%}


    -- Execute DROP statements if applicable
    {%- if commands_list -%}

        {%- for command in commands_list -%}
            {%- if dry_run == False -%}
                {%- do run_query(command) -%}
                {%- do log('Executed: ' + command, True) -%}
            {%- else -%}
                {%- do log('Not executed: ' + command, True) -%}
            {%- endif -%}

        {%- endfor -%}

    {%- else -%}

        {%- do log('No relations to clean', True) -%}

    {%- endif -%}

{%- endmacro -%}

Execute as follows:

db2 run-operation drop_outdated_tables --args '{"dry_run": False, "except": ["model"]}' [--target]

2 Likes

Here’s a gist of the macro we’ve been using in production (on Redshift) for a long while with good results.

1 Like