FAQ: Cleaning up removed models from your production schema

If you remove a model from your dbt project, dbt does not automatically drop the relation (table/view) from your schema. This means that users can end up with extra objects in their production schema, which can reduce the reliability of your prod data.

As a result, we’ve seen a few questions about how to clean up these objects in your production schema on Slack.

Option 1: Periodically drop and rebuild the entire schema.
dbt is built on the assumption that everything can be rebuilt at any given time, so this is a really easy way to remove objects.
However, it’s not for the faint of heart, and may cause some downtime

Option 2: Query the information schema to find extra objects in prod.
I have the following query in my analysis directory. When run against my database, it finds objects (tables, views, functions) that exist in a prod schema but does not exist in the related dev schema.
Note that this works for me because I routinely drop my dev database so don’t have any extra objects in it. This query has been tested against Redshift and Postgres.

-- set pairs of schemas up as a cte for maximum flexibility
with dbt_schemas as (
  select
  'analytics' as prod_schema,
  'analytics_claire' as dev_schema
  union all
  select
  'foo' as prod_schema,
  'foo_claire' as dev_schema
)

, objects as (
  select
  lower(coalesce(nullif(table_type, 'BASE TABLE'), 'table')) as obj_type,
  table_schema as obj_schema,
  table_name as obj_name
  from information_schema.tables

  union all

  select
  'function' as obj_type,
  specific_schema as obj_schema,
  routine_name as obj_name
  from information_schema.routines
  where routine_type = 'FUNCTION'
)
, prod_objects as (
  select
  objects.*
  , dbt_schemas.dev_schema as expected_dev_schema
  from objects
  inner join dbt_schemas on dbt_schemas.prod_schema = objects.obj_schema
)

, prod_only_objects as (
  select prod_objects.*
  from prod_objects
  left join objects on objects.obj_schema = prod_objects.expected_dev_schema
    and objects.obj_name = prod_objects.obj_name
    and objects.obj_type = prod_objects.obj_type
  where objects.obj_name is null
)
select
  '-- drop ' ||  obj_type || ' if exists "' || obj_schema || '"."' || obj_name || '" cascade;' as drop_statement
from prod_only_objects

The query returns a set of sql statements that can then be copied to a sql console and executed to drop the extra objects.

Ideally I’d return the first part of the query as a dbt statement and output the drop statements themselves to a sql file in compiled/analysis/. However, analyses are built when you execute dbt compile, and dbt statements are only executed on run (rather than compile). As such, it’s not possible to return the results of the dbt statement to your Jinja context.

Option 3: ???
There’s probably also a third way to do this which would involve iterating through your dbt project to find the expected objects, and comparing it to the state of your prod schemas. Let me know if anyone investigates this option!

5 Likes

A similar approach to option 2 is to query the information schema for objects that haven’t been updated recently. We run dbt 3x a day so these queries returns objects that should be dropped (we use snowflake)

SELECT *
FROM information_schema.tables
WHERE last_altered < DATEADD('hours', -8, CURRENT_TIMESTAMP);
SELECT *
FROM information_schema.views
WHERE last_altered < DATEADD('hours', -8, CURRENT_TIMESTAMP);
6 Likes

Option 3??!!

Decided to tackle this as a bunch of refactoring recently resulted in a lot of models being renamed. Thankfully these days there is run-operation.

{% macro delete_outdated_tables(schema) %} 
  {% if (schema is not string and schema is not iterable) or schema is mapping or schema|length <= 0 %}
    {% do exceptions.raise_compiler_error('"schema" must be a string or a list') %}
  {% endif %}

  {% call statement('get_outdated_tables', fetch_result=True) %}
    select current.schema_name,
           current.ref_name,
           current.ref_type
    from (
      select schemaname as schema_name, 
             tablename  as ref_name, 
             'table'    as ref_type
      from pg_catalog.pg_tables pt 
      where schemaname in (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )
      union all
      select schemaname as schema_name, 
             viewname   as ref_name, 
             'view'     as ref_type
      from pg_catalog.pg_views
        where schemaname in (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )) as current
    left join (values
      {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                    + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list %} 
        ('{{node.schema}}', '{{node.name}}'){% if not loop.last %},{% endif %}
      {%- endfor %}
    ) as desired (schema_name, ref_name) on desired.schema_name = current.schema_name
                                        and desired.ref_name    = current.ref_name
    where desired.ref_name is null
  {% endcall %}

  {%- for to_delete in load_result('get_outdated_tables')['data'] %} 
    {% call statement() -%}
      {% do log('dropping ' ~ to_delete[2] ~ ' "' ~ to_delete[0] ~ '.' ~ to_delete[1], info=true) %}
      drop {{ to_delete[2] }} if exists "{{ to_delete[0] }}"."{{ to_delete[1] }}" cascade;
    {%- endcall %}
  {%- endfor %}

{% endmacro %}

To use the macro run the following, substituting your schemas:

dbt run-operation delete_outdated_tables --args "{schema: [foo, bar, baz]}"

What does it do?
Queries the information schema to find all tables and views (within the specified schemas), removes any from the list that exist in the current graph, and attempts to delete the rest. Also handles seeds.

Limitations
Doesn’t handle sources. Ephemeral models may prevent something from being deleted. Currently only works for postgres (and maybe redshift).

4 Likes

To find disabled models you can use jq:

cat target/manifest.json | jq -r '.nodes | to_entries | map(select((.value.config.enabled|not) and (.value.resource_type == "model"))) | map("DELETE FROM " + .value.database + "." + .value.schema + "." + .value.alias + ";") | join("\n")'

Thanks @elexisvenator for this! I was able to modify your solution to make it work for Redshift. I also added a “dry_run” parameter to the macro as a precaution as well. I thought I’d share my solution here since I benefited from your work. Thanks!

{% macro delete_orphaned_tables(schema, dry_run=False) %}
  {% if (schema is not string and schema is not iterable) or schema is mapping or schema|length <= 0 %}
    {% do exceptions.raise_compiler_error('"schema" must be a string or a list') %}
  {% endif %}

  {% call statement('get_orphaned_tables', fetch_result=True) %}
    SELECT current.schema_name,
           current.ref_name,
           current.ref_type
    FROM (
      SELECT schemaname AS schema_name,
             tablename  AS ref_name,
             'table'    AS ref_type
      FROM pg_catalog.pg_tables pt
      WHERE schemaname IN (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )
      UNION ALL
      SELECT schemaname AS schema_name,
             viewname   AS ref_name,
             'view'     AS ref_type
      FROM pg_catalog.pg_views
        WHERE schemaname IN (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )) AS current
    LEFT JOIN (
      {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                    + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list %}
        SELECT
        '{{node.schema}}' AS schema_name
         ,'{{node.name}}' AS ref_name
        {% if not loop.last %} UNION ALL {% endif %}
      {%- endfor %}
    ) AS desired on desired.schema_name = current.schema_name
                and desired.ref_name    = current.ref_name
    WHERE desired.ref_name is null
  {% endcall %}
  {% set result = load_result('get_orphaned_tables')['data'] %}
  {% if result %}
      {%- for to_delete in result %}
        {% call statement() -%}
            {% if dry_run %}
                {% do log('To be dropped: ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) %}
                SELECT
                    '{{ to_delete[2] }}'
                    , '{{ to_delete[0] }}'
                    , '{{ to_delete[1] }}';
            {% else %}
                {% do log('Dropping ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) %}
                DROP {{ to_delete[2] }} IF EXISTS "{{ to_delete[0] }}"."{{ to_delete[1] }}" CASCADE;
                {% do log('Dropped ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) %}
            {% endif %}
        {%- endcall %}
      {%- endfor %}
  {% else %}
    {% do log('No orphan tables to clean.', True) %}
  {% endif %}
{% endmacro %}
2 Likes

I also had to change '{{ node.schema }}' to '{{ node.config.schema }}' when running in our production environment. I’m not entirely sure why, but my guess was it had to do with dbt version. Either way, I hope someone finds this useful.

1 Like

I ended up actually not being able to execute the drop commands on a non-dry run because of some whitespace that was creating silent errors in Redshift because of my jinja templating. I also modified things to utilize run_query() instead of call statements to make it more readable and easily executable at dbt’s suggestion. Here’s the final product:

{% macro delete_orphaned_tables(schema, dry_run=True) %}
  {% if (schema is not string and schema is not iterable) or schema is mapping or schema|length <= 0 %}
    {% do exceptions.raise_compiler_error('"schema" must be a string or a list') %}
  {% endif %}

  {% set query %}
    SELECT current.schema_name,
           current.ref_name,
           current.ref_type
    FROM (
      SELECT schemaname AS schema_name,
             tablename  AS ref_name,
             'table'    AS ref_type
      FROM pg_catalog.pg_tables pt
      WHERE schemaname IN (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )
      UNION ALL
      SELECT schemaname AS schema_name,
             viewname   AS ref_name,
             'view'     AS ref_type
      FROM pg_catalog.pg_views
        WHERE schemaname IN (
        {%- if schema is iterable and (var is not string and var is not mapping) -%}
          {%- for s in schema -%}
            '{{ s }}'{% if not loop.last %},{% endif %}
          {%- endfor -%}
        {%- elif schema is string -%}
          '{{ schema }}'
        {%- endif -%}
      )) AS current
    LEFT JOIN (
      {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                    + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list %}
        SELECT
        '{{node.config.schema}}' AS schema_name
         ,'{{node.name}}' AS ref_name
        {% if not loop.last %} UNION ALL {% endif %}
      {%- endfor %}
    ) AS desired on desired.schema_name = current.schema_name
                and desired.ref_name    = current.ref_name
    WHERE desired.ref_name is null
  {% endset %}
  {%- set result = run_query(query) -%}
  {% if result %}
      {%- for to_delete in result -%}
        {%- if dry_run -%}
            {%- do log('To be dropped: ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
        {%- else -%}
            {%- do log('Dropping ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
            {% set drop_command = 'DROP ' ~ to_delete[2] ~ ' IF EXISTS ' ~ to_delete[0] ~ '.' ~ to_delete[1] ~ ' CASCADE;' %}
            {% do run_query(drop_command) %}
            {%- do log('Dropped ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
        {%- endif -%}
      {%- endfor -%}
  {% else %}
    {% do log('No orphan tables to clean.', True) %}
  {% endif %}
{% endmacro %}
1 Like

I just had to do a similar process, with a focus on dropping source tables that we had added to our sync job in the past but weren’t using.

I approached this with a combo of the Jinja graph variable, the information_schema and the venerable spreadsheet.

To build a list of sources we did use, I used the compiled results of this snippet:

{% for node in graph.sources.values() -%}
  select '{{ node.schema }}' as s_name, '{{ node.name }}' as t_name
  {%- if not loop.last %} union all {% endif %}
{% endfor %}

And pasted their results into this query:

with defined_sources as (
    select 'users' as s_name, 'users' as t_name union all
    select 'users' as s_name, 'contactroles' as t_name union all
    ...
    select 'ft_hubspot' as s_name, 'ticket' as t_name
),
     
relevant_tables as (
    select 
        table_schema as s_name, 
        table_name as t_name
    from information_schema.tables
    where table_schema not ilike 'dev_%' --our dbt dev schemas
    and table_schema not ilike 'analytics%' --our output schemas
    and table_schema not in ('pg_catalog', 'information_schema', 'admin', 'public', 'census', 'inbound__timesheets_and_quality_control') --Redshift internal tables and tables built by using get_relations_by_pattern instead of being defined
)

select * from relevant_tables
except
select * from defined_sources

This gave me all the tables in the warehouse that weren’t defined in dbt, so we weren’t using and could consider dropping to save disk usage and reduce load on the source databases.

To focus on the biggest tables first, I wanted to combine it with the metadata from SVV_TABLE_INFO:

select
   schema as s_name,
   "table" as t_name,
   size as size_mb,
   tbl_rows as total_rows
from SVV_TABLE_INFO

However, I found that information_schema queries always run against the leader node which meant I couldn’t join the results of this query with my first query. Instead, I dumped the results to a spreadsheet and worked it out there.

I also adapted @elexisvenator 's code to work with Snowflake and dbt 1.1.0 - figured I would add my adaptation to the discussion for any future lost souls! Some notable changes I made to that source:

  • Re-set the schema variable to always be a list to prevent duplicate logic checking the schema type repeatedly
  • Always use uppercase schemas to match Snowflake consistently
  • Join the Snowflake information_schema data to values for both the current dbt node’s schema and the provided target schemas, to handle cases where a user is in a target environment whose schema differs from the desired schema
  • Insert an exception to exit the script if a BRONZE schema is about to be dropped, to prevent dropping raw source data in an automated CI/CD job (assumption being that dropping calculated silver/gold views and tables is less risky, since we can always recreate them from the raw bronze data instead)
{% macro delete_outdated_tables(schema) %} 
  {% if (schema is not string and schema is not iterable) or schema is mapping or schema|length <= 0 %}
    {% do exceptions.raise_compiler_error('"schema" must be a string or a list') %}
  {% endif %}
  {% if schema is string %}
    {% set schema = [schema] %}
  {% endif %}

  {% call statement('get_outdated_tables', fetch_result=True) %}
    select c.schema_name,
           c.ref_name,
           c.ref_type
    from (
        select table_schema as schema_name, 
           table_name  as ref_name, 
             'table'    as ref_type
      from information_schema.tables 
      where table_schema in (
        {%- for s in schema -%}
        UPPER('{{ s }}'){% if not loop.last %},{% endif %}
        {%- endfor -%}
      )
    union all
    select table_schema as schema_name, 
           table_name  as ref_name, 
             'view'     as ref_type
      from information_schema.views
        where table_schema in (
        {%- for s in schema -%}
        UPPER('{{ s }}'){% if not loop.last %},{% endif %}
        {%- endfor -%}
      )) as c
    left join (values
      {%- for node in graph['nodes'].values() | selectattr("resource_type", "equalto", "model") | list
                    + graph['nodes'].values() | selectattr("resource_type", "equalto", "seed")  | list %} 
        {% for s in schema %}
            (UPPER('{{ s }}'), UPPER('{{node.name}}')),
        {% endfor %}
        (UPPER('{{node.schema}}'), UPPER('{{node.name}}')){% if not loop.last %},{% endif %}
      {%- endfor %}
    ) as desired (schema_name, ref_name) on desired.schema_name = c.schema_name
                                        and desired.ref_name    = c.ref_name
    where desired.ref_name is null
  {% endcall %}

  {%- for to_delete in load_result('get_outdated_tables')['data'] %} 
    {% set fqn = target.database + '.' + to_delete[0] + '.' + to_delete[1] %}
    {% if 'BRONZE' in fqn %}
      {% do exceptions.raise_compiler_error('Was asked to drop a bronze table, will not proceed. Table: ' + fqn) %}
    {% endif %}
    {% call statement() -%}
      {% do log('dropping ' ~ to_delete[2] ~ ': ' ~ fqn, info=true) %}
      drop {{ to_delete[2] }} if exists {{ fqn }} cascade;
    {%- endcall %}
  {%- endfor %}

{% endmacro %}
2 Likes

hey, does this work also for simple incremental runs or do you need to fully refresh those tables in order to enable Snowflake to pick that up in the ‘last_altered’ field?

snowflake’s docs say it is

Date and time when the table was last altered by a DDL or DML operation

So any normal insert will also work

A post was split to a new topic: How do I temporarily make an archive copy of a schema?

For what it’s worth a colleague and I took the above and developed it a little - to iterate over the available schemas automatically (so no need to pass in as an arg), and to provide a bit clearer output on what is happening. This is for Snowflake.

--instructions
--
--If you want to simply see which tables/views exist in Snowflake, that are not in the dbt graph:
--dbt run-operation delete_orphaned_tables --args "{dry_run: True}"
--
--if you want to actually drop those tables/views:
--dbt run-operation delete_orphaned_tables --args "{dry_run: False}"
--
--Note by default this macro will look in the database specified in your default target, as defined in your profiles.yml.
--you can be explicit about the database, by specifying a different target (add --target argument), 
--but you will need to set up targets per database, i.e. not just dev/prod, but for example:
--  dev
--  dev-raw
--  dev-analytics
--  prod
--
--so then you might run for example:
--dbt run-operation delete_orphaned_tables --args "{dry_run: True}" --target dev-analytics

{% macro delete_orphaned_tables(dry_run=False) %}
  
  {% do log("", True) %} 
  {% do log("Searching for orphaned tables/views...", True) %}
  {% do log("Using target profile: " ~ target.name ~ " (database: " ~ target.database ~ ").", True) %} 

  {% set schema_query %}
      SELECT distinct table_schema
      from (
        SELECT distinct table_schema
        FROM information_schema.tables
        UNION ALL
        SELECT distinct table_schema
        FROM information_schema.views
      ) u
      where table_schema <> 'INFORMATION_SCHEMA'
  {% endset %}

  {#
    {% do log(schema_query, True) %} 
  #}
  {%- set result = run_query(schema_query) -%}
  {% if result %}
    {%- for row in result -%}
      {% set schema = row[0] %}
      
      {% do log("", True) %} 
      {% do log("schema: " ~ schema, True) %} 
    
      {% set query %}
        SELECT UPPER(c.schema_name) AS schema_name,
                UPPER(c.ref_name) AS ref_name,
                UPPER(c.ref_type) AS ref_type
        FROM (
          SELECT table_schema AS schema_name,
                  table_name  AS ref_name,
                  'table'    AS ref_type
          FROM information_schema.tables pt
          WHERE table_schema = '{{ schema }}'      
          AND TABLE_TYPE = 'BASE TABLE'
          UNION ALL
          SELECT table_schema AS schema_name,
                  table_name   AS ref_name,
                  'view'     AS ref_type
          FROM information_schema.views
            WHERE table_schema = '{{ schema }}'
                ) AS c
          LEFT JOIN (
            {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                          + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list %}
              SELECT
              upper('{{node.config.schema}}') AS schema_name
                ,upper('{{node.name}}') AS ref_name
              {% if not loop.last %} UNION ALL {% endif %}
            {%- endfor %}
          ) AS desired on desired.schema_name = c.schema_name
                      and desired.ref_name    = c.ref_name
          WHERE desired.ref_name is null
      {% endset %}

      {#
      {% do log(query, True) %} 
      #}
      
      {%- set result = run_query(query) -%}
      {% if result %}
          {%- for to_delete in result -%}
            {%- if dry_run -%}
                {%- do log('To be dropped: ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
            {%- else -%}
                {% set drop_command = 'DROP ' ~ to_delete[2] ~ ' IF EXISTS ' ~ to_delete[0] ~ '.' ~ to_delete[1] ~ ' CASCADE;' %}
                {% do run_query(drop_command) %}
                {%- do log('Dropped ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
            {%- endif -%}
          {%- endfor -%}
      {% else %}
        {% do log('No orphan tables to clean.', True) %}
      {% endif %}

    {%- endfor -%}
  {% endif %}

{% endmacro %}
1 Like

I was working on tweaking the macro from deegeebee for our Snowflake implementation which uses multiple database destinations for models. If you have a lot of models this does not work well and maybe not at all due to the query text size.

We’re still migrating some of our pre-dbt relations so I added sources.

I started receiving the max compilation size error from Snowflake due to the number of models we have. I abandoned my iteration below and added elementary to our project. If you store dbt artifacts finding orphaned relations can be very easy. It may not be appropriate for automating a drop of these objects but I only need to surface them.

with cte_existing_relations as
(
    select
        upper(table_catalog) as database_name
      , upper(table_schema)  as schema_name
      , upper(table_name)    as ref_name
      , case
            when table_type = 'VIEW'
                then 'view'
            when table_type = 'BASE TABLE'
                then 'table'
            else null
            end              as ref_type
    from YOURDATABASE.information_schema.tables
    where table_schema not in ( 'INFORMATION_SCHEMA' )

    -- union all...

)
,cte_dbt_relations as
(
    select
        upper(database_name) as database_name
        , upper(schema_name) as schema_name
        , upper(alias) as ref_name
        , materialization
    from dbt.elementary.dbt_models

    union all

    select
        upper(database_name) as database_name
        , upper(schema_name) as schema_name
        , upper(alias) as ref_name
        , null::text as materialization
    from dbt.elementary.dbt_seeds

    union all

    select
        upper(database_name) as database_name
        , upper(schema_name) as schema_name
        , upper(name) as ref_name
        , null::text as materialization
    from dbt.elementary.dbt_sources
)

SELECT   found.database_name as database_name
        ,found.schema_name   as schema_name
        ,found.ref_name      as ref_name
        ,found.ref_type      as ref_type
FROM cte_existing_relations found
LEFT JOIN cte_dbt_relations desired
    on found.database_name = desired.database_name
    and found.schema_name = desired.schema_name
    and found.ref_name = desired.ref_name
WHERE desired.ref_name is null
--instructions
--
--If you want to simply see which tables/views exist in Snowflake, that are not in the dbt graph:
--dbt run-operation delete_orphaned_relations --args "{dry_run: True}"
--
--if you want to actually drop those tables/views:
--dbt run-operation delete_orphaned_relations --args "{dry_run: False}"
--
--Note by default this macro will look in the database specified in your default target, as defined in your profiles.yml.
--you can be explicit about the database, by specifying a different target (add --target argument),
--but you will need to set up targets per database, i.e. not just dev/prod, but for example:
--  dev
--  dev-raw
--  dev-analytics
--  prod
--
--so then you might run for example:
--dbt run-operation delete_orphaned_relations --args "{dry_run: True}" --target dev-analytics

{% macro delete_orphaned_relations(dry_run=True) %}
  {% do log("", True) %}
  {% do log("Searching for orphaned tables/views...", True) %}
  {% do log("Using target profile: " ~ target.name ~ " (database: " ~ target.database ~ ").", True) %}

  {# Get all databases #}
  {% set models = [] %}
  {% set databases = {} %}
  {%- for node in (graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list
                + graph.sources.values() | selectattr("resource_type", "equalto", "source") | list) %}
    {%- set database = node.database -%}
    {%- set schema = node.schema -%}
    {%- if node.resource_type == 'source' -%}
        {%- set alias = node.name -%}
    {%- else -%}
        {%- set alias = node.alias -%}
    {%- endif -%}

    {%- set relation = {'database': database, 'schema': schema, 'alias': alias} -%}
    
    {%- do models.append(relation) -%}

    {%- if database in databases %}
        {% if schema not in databases[database] %}
            {% do databases.update({database: databases[database] + [schema]}) %}
        {% endif %}
    {%- else %}
        {# Add database and schema because neither exist #}
        {{ dbt_utils.log_info('Adding database ' ~ database ~ ' to dict') }}
        {% do databases.update({database: [schema]}) %}
    {%- endif -%}

  {%- endfor %}

  {#
    {% do log(schema_query, True) %}
  #}

{% do log("", True) %}

{% set query %}
with cte_all_existing_relations as
(
    {% for key, value in databases.items() %}
    SELECT
         upper(table_catalog)   as database_name
        ,upper(table_schema)    as schema_name
        ,upper(table_name)      as ref_name
        ,case
            when table_type = 'VIEW'
                then 'view'
            when table_type = 'BASE TABLE'
                then 'table'
            else null
            end          as ref_type
    FROM {{key}}.information_schema.tables
    where table_schema not in ('INFORMATION_SCHEMA')

    {% if not loop.last -%} UNION ALL {% endif -%}

    {% endfor %}
)
,cte_dbt_relations as 
(
    {%- for node in models-%}
        {%- set db = "upper('" ~ node.database ~ "')" -%}
        {%- set sch = "upper('" ~ node.schema ~ "')" -%}
        {%- set rel = "upper('" ~ node.alias ~ "')" %}
        SELECT
              {{db.ljust(49)}}  as database_name
            , {{sch.ljust(50)}} as schema_name
            , {{rel.ljust(50)}} as ref_name
        {% if not loop.last %} UNION ALL {% endif %}
    {%- endfor %}
)

SELECT   c.database_name as database_name
        ,c.schema_name   as schema_name
        ,c.ref_name      as ref_name
        ,c.ref_type      as ref_type
FROM cte_all_existing_relations c
LEFT JOIN cte_dbt_relations desired
    on c.database_name = desired.database_name
    and c.schema_name = desired.schema_name
    and c.ref_name = desired.ref_name
WHERE desired.ref_name is null
{% endset %}


{{ dbt_utils.log_info(query) }}

{#
{%- set result = run_query(query) -%}
{% if result %}
    {%- for to_delete in result -%}
    {%- if dry_run -%}
        {%- do log('To be dropped: ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
    {%- else -%}
        {% set drop_command = 'DROP ' ~ to_delete[2] ~ ' IF EXISTS ' ~ to_delete[0] ~ '.' ~ to_delete[1] ~ ' CASCADE;' %}
        {% do run_query(drop_command) %}
        {%- do log('Dropped ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], True) -%}
    {%- endif -%}
    {%- endfor -%}
{% else %}
{% do log('No orphan tables to clean.', True) %}
{% endif %}
#}

{% endmacro %}
1 Like

We are using Databricks without Unity Catalog enabled, so we don’t have access to the INFORMATION_SCHEMA objects.

The commands SHOW VIEWS IN [Schema] and SHOW TABLES IN [Schema] works however. We built some macros around this to do a weekly cleanup of all orphan tables/views that are no longer referenced by dbt.

The macros called from our cleanup job are:

  • drop_unused_tables_and_views
  • drop_empty_schemas

The other macros are utility functions…

Gets a list of database tables and views from Databricks:

{% macro get_all_tables_and_views() %}

  {# 
  This macro returns a list of tables and views from the databases as well as the schema name
  
  Note:
  For some reason Databricks will sometimes list the same schema/name in both SHOW TABLES and SHOW VIEWS
  It seems to work better when views are loaded first and subsequently later deleted first in drop_unused_tables_and_views macro 
  #}

  
  {{ log( 'get_all_tables_and_views: Loading views and tables from database' , info=true) }}
  {% set schema_results = run_query('SHOW DATABASES') %}
  {% set all_schemas = schema_results.columns[0].values() %}
  {% set all_tables_and_views = [] %}

  {% for schema in all_schemas %}
    
    {% set view_results = run_query('SHOW VIEWS IN ' ~ schema) %}
    {% set views_in_schema = view_results.columns[1].values() %}
    {% for name in views_in_schema %}
      {% set view = {'schema': schema, 'name': name, 'type': 'view'} %}
      {{ log( view , info=true) }}
      {% do all_tables_and_views.append(view) %}
    {% endfor %}

    {% set table_results = run_query('SHOW TABLES IN ' ~ schema) %}
    {% set tables_in_schema = table_results.columns[1].values() %}
    {% for name in tables_in_schema %}
      {% set table = {'schema': schema, 'name': name, 'type': 'table'} %}
      {{ log( table , info=true) }}
      {% do all_tables_and_views.append(table) %}
    {% endfor %}

  {% endfor %}

  {{ return(all_tables_and_views) }}
{% endmacro %}

Gets a list of all models from the dbt manifest that are materialized as views or tables (incl. incremental):

{% macro get_dbt_nodes() %}
  
  {# 
  This macro parses the dbt model graph and returns all tables and views
  
  Note:
  Only models materialized as view, table or incremental are considered. 
  Incremental models are treated as tables. 
  #}

  {{ log( 'get_dbt_nodes: Loading views and tables from dbt graph' , info=true) }}  
  {% set dbt_tables_and_views = [] %}
  {% set table_materializations = ["table", "incremental"] %}
  {% set all_materializations = ["view", "table", "incremental"] %}
  
  {% for node in graph.nodes.values() %}    
    {% if node.resource_type == 'model' and node.config.get('materialized', 'none') in all_materializations %}
      
      {# Set the node_type type as either table or view. #}
      {% if node.config.get('materialized', 'table') in table_materializations %}
        {% set node_type = "table" %}
      {% else %}
        {% set node_type = "view" %}
      {% endif %}
      
      {% set node = {'schema': node.schema, 'name': node.name, 'type': node_type} %}
      {{ log( node , info=true) }}
      {% do dbt_tables_and_views.append(node) %}
    {% endif %}
  {% endfor %}

  {{ return(dbt_tables_and_views) }}
{% endmacro %}

Get a list of all schemas that are referenced by dbt sources (we want to skip cleanup of those):

{% macro get_dbt_sources_schemas() %}
  
  {# 
  This macro returns a list of all schema names contained within the dbt model graph
  #}
  
  {% set dbt_sources_schemas = [] %}
  {% for node in graph.sources.values() %}
    {% if node.schema is not none %}
      {% if node.schema not in dbt_sources_schemas %}
        {% set msg ='Found source schema: ' + node.schema %}
        {{ log( msg , info=true) }}
        {% do dbt_sources_schemas.append(node.schema.lower()) %}
      {% endif %}      
    {% endif %}
  {% endfor %}

  {{ return(dbt_sources_schemas) }}
{% endmacro %}

Delete all database tables and views not referenced by any dbt model (objects referenced by dbt sources are skipped):

{% macro drop_unused_tables_and_views(dry_run=true) %}
  
  {# 
  This macro will drop all tables and views from the database that is not referenced by dbt. 
  Note: objects belonging to schemas containing dbt sources will remain untouched by this operation.
  #}
  
  {{ log( 'Dropping unknown tables and views. dry_run: ' ~ dry_run , info=true) }}
  
  {% set all_tables_and_views = get_all_tables_and_views() %}
  {% set dbt_tables_and_views = get_dbt_nodes() %}
  {% set ns = namespace(existsInDbt=false, query='') %}
  {% set dbt_sources_schemas = get_dbt_sources_schemas() %}

  {{ log( 'Starting to drop non-dbt tables and views. dry_run: ' ~ dry_run , info=true) }}
  {% for relation in all_tables_and_views %}
    {% if relation.schema not in dbt_sources_schemas %}

      {% set ns.existsInDbt = false %}
      {% for node in dbt_tables_and_views %}
        {% if relation.schema == node.schema and relation.name == node.name%}
          {% set ns.existsInDbt = true %}
        {% endif %}
      {% endfor %}

      {# Only drop objects that does not exist in dbt  #}
      {% if not ns.existsInDbt %}
        
        {% if relation.type == 'view' %}
          {% set ns.query = 'DROP VIEW IF EXISTS ' + relation.schema + '.' + relation.name %}
        {% else %}
          {% set ns.query = 'DROP TABLE IF EXISTS ' + relation.schema + '.' + relation.name %}
        {% endif %}

        {{ log( ns.query , info=true) }}        
        {% if not dry_run %}
          {% do run_query(ns.query) %}
        {% endif %}

      {% endif %}
    {% endif %}
  {% endfor %}

{% endmacro %}

Delete all database schemas that no longer contains any tables/views:

{% macro drop_empty_schemas(dry_run=true) %}
  
  {# 
  This macro checks all current database schemas and drops those that does not 
  contain any views or tables 
  #}

  {{ log( 'drop_empty_schemas: Dropping all schemas from database that contains no table/views' , info=true) }}
  {% set schema_results = run_query('SHOW DATABASES') %}
  {% set all_schemas = schema_results.columns[0].values() %}
  {% set all_tables_and_views = [] %}

  {% for schema in all_schemas %}
    
    {# Cant drop default DB! #}
    {% if schema != "default" %}
      {% set view_results = run_query('SHOW VIEWS IN ' ~ schema) %}
      {% set table_results = run_query('SHOW TABLES IN ' ~ schema) %}

      {# Only drop if schema is empty #}
      {% if view_results|length == 0 and table_results|length == 0 %}
        {% set query = 'DROP DATABASE ' + schema %}
        {{ log( query , info=true) }}
        {% if not dry_run %}
          {% do run_query(query) %}
        {% endif %}
      {% endif %}
    {% endif %}

  {% endfor %}

{% endmacro %}
2 Likes

I took a bunch of the above suggestions and after some moderate swearing :melting_face: have something for bigquery

few notes:

  1. kept the schema arg. I want to be able to pass in the dbt-managed list of datasets (the arg is still named schemas cause I like snowflake better :smirk: )
  2. instead of BRONZE schema check, added a donot_drop_list seed w/just SCHEMA name and TABLE name columns b/c had a client once with some non-dbt tables sprinkled in their dbt managed schemas. uh. don’t do this. but… if already there then need a way to skip them
  3. stale_days - added an argument so can query/drop tables that haven’t been touched in a specific number of days (I couldn’t find “last_altered” type info for views in BQ. anyone know where that’d be ?)
  4. added a few extra details (row count/last altered) to output b/c I found this info helpful when a client that’s been around a while had a lot of orphaned objects to review.
  5. schema name case could be an issue depending on your settings if you name tHingS_WeIRd

{% macro drop_orphaned_tables_bigquery(schema, stale_days = 60, dry_run = true) %} 
​
{% if execute %}
  {% if (schema is not string and schema is not iterable) or schema is mapping or schema|length <= 0 %}
    {% do exceptions.raise_compiler_error('"schema" must be a string or a list') %}
  {% endif %}
  {% if schema is string %}
  {% set schema = [schema] %}
  {% endif %}
​
  {% call statement('get_outdated_tables', fetch_result=True) %}
    select c.schema_name,
           c.ref_name,
           c.ref_type,
           c.last_altered,
           c.row_count
    from (
        select dataset_id as schema_name,
               table_id as ref_name,
               'table' as ref_type,
               cast(TIMESTAMP_MILLIS(last_modified_time) as string) as last_altered,
               cast(row_count as string) as row_count
      from (

        {%- for s in schema %}
         select * from {{ s }}.__TABLES__
        {% if not loop.last %}union all {% endif %}
        {%- endfor -%}
      )
      where TIMESTAMP_MILLIS(last_modified_time) < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {{stale_days}} DAY)
        and dataset_id||table_id not in (select table_schema||table_name from {{ref('donot_drop_list')}})
      
    union all
    select table_schema as schema_name, 
           table_name  as ref_name, 
           'view'     as ref_type,
           'NA' as last_altered, --can't find last modified date in BQ for views 
           'NA'  as row_count
      from (
        {%- for s in schema %}
         select * from {{ s }}.INFORMATION_SCHEMA.VIEWS
        {% if not loop.last %}union all {% endif %}
        {%- endfor -%}
      )
        where  table_schema||table_name not in (select table_schema||table_name from {{ref('donot_drop_list')}})
      ) as c
    left join ( select * FROM UNNEST([STRUCT<schema_name STRING, ref_name STRING>
      {%- for node in graph['nodes'].values() | selectattr("resource_type", "equalto", "model") | list
                    + graph['nodes'].values() | selectattr("resource_type", "equalto", "seed")  | list 
                    + graph['nodes'].values() | selectattr("resource_type", "equalto", "snapshot")  | list%} 
            {%- if (node.schema in schema) and node.alias and node.alias != None %}
                ('{{ node.schema }}', '{{node.alias}}'),
            {%- endif -%}
            {%- if loop.last %}  {# this is to avoid the issue of the last node in graph having alias = 'None' or being in a different schema causing compile error due to the "," if node.alias is None here it doesn't really hurt anything#}
                ('{{ node.schema }}','{{node.alias}}')
                ])
            {%- endif %}
      {%- endfor %}
    ) as desired on desired.schema_name = c.schema_name
                 and desired.ref_name    = c.ref_name
    where desired.ref_name is null
  {% endcall %}
​
  {%- for to_delete in load_result('get_outdated_tables')['data'] %} 
    {% set fqn = target.database + '.' + to_delete[0] + '.' + to_delete[1] %}
    {% if dry_run == false %}
    {% call statement() -%}
      {% do log('dropping ' ~ to_delete[2] ~ ': ' ~ fqn, info=true) %}
      drop {{ to_delete[2] }} if exists {{ fqn }};
    {%- endcall %}
    {% elif dry_run == true %}
    {% do log( 'drop '~to_delete[2] ~ ' if exists ' ~ fqn ~'; -- last_altered:'~ to_delete[3].strftime("%d-%b-%Y %H:%M") ~'; row_count:'~ to_delete[4], info=true) %}
    {% endif %}
    {%- if loop.last and dry_run == true %}
        {% do log('Please drop the above objects or move them to the proper backup schema.\n', info=true) %}
    {% endif %}
  {%- endfor %}
​
   {%- set response = load_result('get_outdated_tables')['response']  %}
   {% do log('Query Status + # of results found: '~ response, info=true) %}
​
    {% endif %}
{% endmacro %}

adapted it to work with SQL 2016 (yes, we’re using it!). Use at you own risk

{% macro delete_outdated_tables(list_only=False) %} 

    {% call statement('outdated_tables', fetch_result=True) %}

        with currenly_existing as (
           select
                table_schema as schema_name,
                table_name as ref_name,
                lower(coalesce(nullif(table_type, 'BASE TABLE'), 'table')) as ref_type
            from information_schema.tables
        )
        select curr.schema_name,
                curr.ref_name,
                curr.ref_type
        from currenly_existing as curr
        left join (values
            {%- for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list
                        + graph.nodes.values() | selectattr("resource_type", "equalto", "seed")  | list %} 
            ('{{node.schema}}', '{{node.name}}'){% if not loop.last %},{% endif %}
            {%- endfor %}
        ) as desired (schema_name, ref_name) 
            on desired.schema_name = curr.schema_name
            and desired.ref_name    = curr.ref_name
        
        where desired.ref_name is null

    {% endcall %}
    {%- for to_delete in load_result('outdated_tables')['data'] %} 

        {% if list_only %}

            {{ log('would drop ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], info=true) }}

        {% else %}

            {{ log('dropping ' ~ to_delete[2] ~ ' ' ~ to_delete[0] ~ '.' ~ to_delete[1], info=true) }}
            {% call statement() -%}    
            drop {{ to_delete[2] }} if exists "{{ to_delete[0] }}"."{{ to_delete[1] }}" 
            {%- endcall %}

        {%- endif %}

    {%- endfor %}

{% endmacro %}

I wish there were a dbt cleanup command for this…

1 Like

This is the thing that should be included from the day 1 to be honest…