I agree, this should be an option on dbt run or something. Want to submit a feature request?
Seeing as we used this as a base, I thought I’d throw our hat into the ring and share our iteration on this macro for Snowflake. We’ve added some features like table/schema exclusions, drops based on last modified time, max operation limits per run, and more.
/*
Description: This macro identifies and drops tables and views in Snowflake
that are no longer used in dbt. It can either log the drop
commands (dry_run=true) or execute them directly.
Parameters:
dry_run (Boolean): If true, only log the drop commands without executing them.
include_all_schemas (Boolean): If true, check all schemas rather than just those containing active dbt models.
schemas_to_exclude (list): List of schema names to exclude from being dropped.
tables_to_exclude (list): List of table names to exclude from being dropped.
last_altered (int): Only drop objects that have not been modified in this many days (0 means drop regardless).
max_operations (int): Maximum number of objects to drop in a single run.
*/
{%- macro drop_orphaned_tables(
dry_run=false,
include_all_schemas=false,
schemas_to_exclude=[],
tables_to_exclude=[],
last_altered=0,
max_operations=5
) -%}
-- Add default exclusions for system schemas and common utility tables
{%- set schemas_to_exclude = schemas_to_exclude + ['INFORMATION_SCHEMA', 'PUBLIC', 'MONITORING'] -%}
{%- set tables_to_exclude = tables_to_exclude + ['CONFIG_TABLE', 'MAPPING_TABLE'] -%}
{%- if execute -%}
-- Create dictionary to track all dbt-managed models
{%- set current_model_locations = {} -%}
-- Map all dbt models to their locations (database → schema → table)
{%- for node in graph.nodes.values() | selectattr(
"resource_type", "in", ["model", "seed", "snapshot"]
) -%}
{%- set database_name = node.database.upper() -%}
{%- set schema_name = node.schema.upper() -%}
{%- set table_name = node.alias if node.alias else node.name -%}
-- Add database and schema if they don't exist
{%- if database_name not in current_model_locations -%}
{%- do current_model_locations.update({database_name: {}}) -%}
{%- endif -%}
{%- if schema_name not in current_model_locations[database_name] -%}
{%- do current_model_locations[database_name].update({schema_name: []}) -%}
{%- endif -%}
-- Add table to schema
{%- do current_model_locations[database_name][schema_name].append(table_name.upper()) -%}
{%- endfor -%}
{%- do log("DBT models found: " ~ current_model_locations, True) -%}
{%- endif -%}
{%- do log("\nGenerating cleanup queries...\n", True) -%}
-- Query to identify orphaned tables and views. This finds all objects in the
-- database that are not managed by dbt and aren't in the specified
-- exclusion lists.
{%- set cleanup_query -%}
WITH models_to_drop AS (
{%- for database in current_model_locations.keys() -%}
SELECT
CASE
WHEN table_type IN ('BASE TABLE', 'MANAGED') THEN 'TABLE'
WHEN table_type = 'VIEW' THEN 'VIEW'
ELSE NULL
END AS relation_type,
table_catalog,
table_schema,
table_name,
table_catalog || '.' || table_schema || '.' || table_name as relation_name,
{% if last_altered > 0 %}last_altered,{% endif %}
TO_CHAR(ROW_COUNT) as row_count
FROM {{ database }}.information_schema.tables
WHERE 1=1
-- Schema filtering logic: only include schemas we want
{% if not include_all_schemas %}
AND UPPER(table_schema) IN ('{{ "', '".join(current_model_locations[database].keys()) }}')
{% endif %}
{% if schemas_to_exclude|length > 0 %}
AND UPPER(table_schema) NOT IN ('{{ "', '".join(schemas_to_exclude) }}')
{% endif %}
-- Only drop tables that aren't dbt models
AND NOT (
{%- for schema in current_model_locations[database].keys() -%}
UPPER(table_schema) = '{{ schema }}' AND
UPPER(table_name) IN ('{{ "', '".join(current_model_locations[database][schema]) }}')
{% if not loop.last %} OR {% endif %}
{%- endfor %}
)
-- Exclude specific tables by name
{% if tables_to_exclude|length > 0 %}
AND NOT (
{%- for table in tables_to_exclude -%}
UPPER(table_name) = '{{ table|upper }}'
{% if not loop.last %} OR {% endif %}
{%- endfor %}
)
{% endif %}
-- Filter out tables based on last modified time
{% if last_altered > 0 %}
AND last_altered < DATEADD('days', -{{ last_altered }}, CURRENT_TIMESTAMP)
{% endif %}
{% if not loop.last -%} UNION ALL {%- endif %}
{%- endfor -%}
)
-- Generate DROP statements for each orphaned object
SELECT
'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' || table_schema || '.' || table_name || ';' AS drop_commands,
{% if last_altered > 0 %}last_altered,{% endif %}
row_count
FROM models_to_drop
WHERE relation_type IS NOT NULL
ORDER BY table_schema, table_name
LIMIT {{ max_operations }}
{%- endset -%}
-- Execute or log the DROP statements
{%- set drop_results = run_query(cleanup_query) -%}
{%- set drop_commands = drop_results.columns[0].values() -%}
{%- if drop_commands|length > 0 -%}
{%- do log("Found " ~ drop_commands|length ~ " relations to drop:", True) -%}
{%- for drop_command in drop_commands -%}
{% if last_altered > 0 %}
{%- set last_altered_date = drop_results.columns[1].values()[loop.index0] -%}
{%- set formatted_date = last_altered_date.strftime("%d-%b-%Y %H:%M") if last_altered_date else "N/A" -%}
{%- set row_count_value = drop_results.columns[2].values()[loop.index0] -%}
{%- do log(drop_command ~ " -- Last altered: " ~ formatted_date ~
" -- Row count: " ~ row_count_value, True) -%}
{% else %}
{%- set row_count_value = drop_results.columns[1].values()[loop.index0] -%}
{%- do log(drop_command ~ " -- Row count: " ~ row_count_value, True) -%}
{% endif %}
{%- if not dry_run -%}
{%- do run_query(drop_command) -%}
{%- do log("Executed", True) -%}
{%- endif -%}
{%- endfor -%}
{%- if dry_run -%}
{%- do log("\nPlease review the above commands before running with dry_run=false to execute them.\n", True) -%}
{%- endif -%}
{%- else -%}
{%- do log("No relations to clean", True) -%}
{%- endif -%}
-- Log a summary of what has/would be dropped
{%- do log("Operation complete: " ~ drop_commands|length ~ " objects processed", True) -%}
{%- endmacro -%}
1 Like
Thanks dobsontom and all the others above for building the code base. I needed to also clean up unlinked schemas, so I exanded upon it to also include them in the cleanup. Been running it on snowflake and has worked without issue there so far.
/*
Description: This macro identifies and drops tables and views in Snowflake
that are no longer used in dbt. It can either log the drop
commands (dry_run=true) or execute them directly.
Parameters:
dry_run (Boolean): If true, only log the drop commands without executing them.
include_all_schemas (Boolean): If true, check all schemas rather than just those containing active dbt models.
schemas_to_exclude (list): List of schema names to exclude from being dropped.
tables_to_exclude (list): List of table names to exclude from being dropped.
last_altered (int): Only drop objects that have not been modified in this many days (0 means drop regardless).
max_operations (int): Maximum number of objects to drop in a single run.
*/
{%- macro delete_orphaned_tables(
dry_run=false,
include_all_schemas=true,
schemas_to_exclude=[],
tables_to_exclude=[],
last_altered=0,
max_operations=20
) -%}
-- Add default exclusions for system schemas and common utility tables
{%- set schemas_to_exclude = schemas_to_exclude + ['INFORMATION_SCHEMA', 'PUBLIC', 'MONITORING'] -%}
{%- set tables_to_exclude = tables_to_exclude + ['CONFIG_TABLE', 'MAPPING_TABLE'] -%}
{%- if execute -%}
{%- set current_model_locations = {} -%}
-- Group dbt-managed models by database + schema
{%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}
{%- set database_name = node.database.upper() -%}
{%- set schema_name = node.schema.upper() -%}
{%- set table_name = node.alias if node.alias else node.name -%}
-- Add database and schema if they don't exist
{%- if database_name not in current_model_locations -%}
{%- do current_model_locations.update({database_name: {}}) -%}
{%- endif -%}
{%- if schema_name not in current_model_locations[database_name] -%}
{%- do current_model_locations[database_name].update({schema_name: []}) -%}
{%- endif -%}
-- Add table to schema
{%- do current_model_locations[database_name][schema_name].append(table_name.upper()) -%}
{%- endfor -%}
-- -- Logs all models read in by the macro. (Uncomment for dev)
{#
{%- do log("DBT-managed models grouped by database:\n" ~ current_model_locations | string, True) -%}
#}
-- Loop through each database and perform clean-up
{%- for database, schemas in current_model_locations.items() -%}
{%- do log("Processing database: " ~ database, True) -%}
-- DROP orphaned tables/views not part of dbt-managed models
{%- set cleanup_query -%}
WITH models_to_drop AS (
SELECT
CASE
WHEN table_type IN ('BASE TABLE', 'MANAGED') THEN 'TABLE'
WHEN table_type = 'VIEW' THEN 'VIEW'
ELSE NULL
END AS relation_type,
table_catalog,
table_schema,
table_name,
table_catalog || '.' || table_schema || '.' || table_name as relation_name,
{% if last_altered > 0 %}last_altered,{% endif %}
TO_CHAR(ROW_COUNT) as row_count
FROM {{ database }}.information_schema.tables
WHERE 1=1
{% if not include_all_schemas %}
AND UPPER(table_schema) IN ('{{ "', '".join(schemas.keys()) }}')
{% endif %}
-- Schema filtering logic: only include schemas we want
{% if schemas_to_exclude|length > 0 %}
AND UPPER(table_schema) NOT IN ('{{ "', '".join(schemas_to_exclude) }}')
{% endif %}
-- Only drop tables that aren't dbt models
AND NOT (
{%- for schema, tables in schemas.items() -%}
(UPPER(table_schema) = '{{ schema }}' AND
UPPER(table_name) IN ('{{ "', '".join(tables) }}'))
{% if not loop.last %} OR {% endif %}
{%- endfor %}
)
-- Exclude specific tables by name
{% if tables_to_exclude|length > 0 %}
AND UPPER(table_name) NOT IN ('{{ "', '".join(tables_to_exclude | map('upper')) }}')
{% endif %}
-- Filter out tables based on last modified time
{% if last_altered > 0 %}
AND last_altered < DATEADD('days', -{{ last_altered }}, CURRENT_TIMESTAMP)
{% endif %}
)
SELECT
'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' || table_schema || '.' || table_name || ';' AS drop_commands,
{% if last_altered > 0 %}last_altered,{% endif %}
row_count
FROM models_to_drop
WHERE relation_type IS NOT NULL
ORDER BY table_schema, table_name
LIMIT {{ max_operations }}
{%- endset -%}
-- Generate DROP statements for each orphaned object
{%- set drop_results = run_query(cleanup_query) -%}
{%- set drop_commands = drop_results.columns[0].values() -%}
{%- if drop_commands|length > 0 -%}
{%- do log("Found " ~ drop_commands|length ~ " orphaned objects to drop in " ~ database ~ ":\n", True) -%}
{%- for drop_command in drop_commands -%}
{% if last_altered > 0 %}
{%- set last_altered_date = drop_results.columns[1].values()[loop.index0] -%}
{%- set formatted_date = last_altered_date.strftime("%d-%b-%Y %H:%M") if last_altered_date else "N/A" -%}
{%- set row_count_value = drop_results.columns[2].values()[loop.index0] -%}
{%- do log(drop_command ~ " -- Last altered: " ~ formatted_date ~
" -- Row count: " ~ row_count_value, True) -%}
{% else %}
{%- set row_count_value = drop_results.columns[1].values()[loop.index0] -%}
{%- do log(drop_command ~ " -- Row count: " ~ row_count_value, True) -%}
{% endif %}
{%- if not dry_run -%}
{%- do run_query(drop_command) -%}
{%- do log("Executed", True) -%}
{%- endif -%}
{%- endfor -%}
{%- if dry_run -%}
{%- do log("Dry run complete for " ~ database ~ " — no changes made. Set dry_run=false to execute deletions.\n", True) -%}
{%- endif -%}
{%- else -%}
{%- do log("No orphaned relations to clean in " ~ database, True) -%}
{%- endif -%}
{%- do log("Operation complete for " ~ database ~ ": " ~ drop_commands|length ~ " objects processed", True) -%}
-- Drop empty schemas not touched by dbt
{% set managed_schemas = schemas.keys() | map('upper') | list %}
{% set excluded_schemas = schemas_to_exclude | map('upper') | list %}
{% set all_schemas_query %}
SELECT DISTINCT UPPER(schema_name) AS schema_name
FROM {{ database }}.information_schema.schemata
WHERE UPPER(schema_name) NOT IN ('{{ excluded_schemas | join("', '") }}')
{% endset %}
{% set all_schemas_result = run_query(all_schemas_query) %}
{% set all_schemas = all_schemas_result.columns[0].values() | list %}
{% set unmanaged_schemas = all_schemas | reject("in", managed_schemas) | list %}
{%- if unmanaged_schemas|length > 0 -%}
{% do log("Unmanaged schemas found in " ~ database ~ ": " ~ unmanaged_schemas | join(', '), True) %}
{%- endif -%}
{% set drop_commands = [] %}
{% for schema in unmanaged_schemas %}
{% set check_empty_query %}
SELECT COUNT(*) AS table_count
FROM {{ database }}.information_schema.tables
WHERE UPPER(table_schema) = '{{ schema }}'
{% endset %}
{% set count_result = run_query(check_empty_query).columns[0].values()[0] | int %}
{% if count_result == 0 %}
{% set drop_schema_stmt = "DROP SCHEMA IF EXISTS " ~ database ~ "." ~ schema ~ " CASCADE;" %}
{% do drop_commands.append(drop_schema_stmt) %}
{% if dry_run %}
{% do log(drop_schema_stmt, True) %}
{% else %}
{% do run_query(drop_schema_stmt) %}
{% do log("Dropped empty unmanaged schema: " ~ database ~ "." ~ schema, True) %}
{% endif %}
{% else %}
{% do log("Skipping schema: " ~ database ~ "." ~ schema ~ " — contains " ~ count_result ~ " tables/views", True) %}
{% endif %}
{% endfor %}
{%- if dry_run -%}
{% do log("Completed schema cleanup for " ~ database ~ ". Empty unmanaged schemas to drop: " ~ drop_commands | length, True) %}
{%- do log("Dry run for " ~ database ~ " — no changes made. Set dry_run=false to execute deletions. \n", True) -%}
{% else %}
{% do log("Completed schema cleanup for " ~ database ~ ". Empty unmanaged schemas dropped: " ~ drop_commands | length ~ "\n", True) %}
{%- endif -%}
{%- endfor -%}
{%- do log("Clean run completed. If more than " ~ max_operations ~ " tables per database needed cleaning, run script again.", True) -%}
{%- else -%}
{%- do log("Macro executed in compile mode - skipping run logic.", True) -%}
{%- endif -%}
{%- endmacro -%}