Using dbt artifacts to track project performance

For those who attended Coalesce, I gave a talk called “Analytics on your analytics” (watch here) where I discuss how Drizly is using dbt metadata to improve our dbt run operations and processes. I wanted to provide a more in depth guide to how we implemented using run_results with Dagster and Snowflake (more than I could cover in a 25 minute talk). I’d love to hear about others who have implemented a similar approach, or how you’re accomplishing the same thing with other tools. Definitely comment below with any examples!

Tracking dbt model performance

As our dbt DAG became more complicated we realized that we needed a more sophisticated orchestration tool to handle the execution of our runs, macros and tests we perform with each production run. We decided to use Dagster because of its flexibility, strong emphasis on parameterization and ability to support users across our data team (analysts, data scientists, data engineers). Dagster offers a dbt library for the execution of the dbt commands, coupled with assets to track dbt artifacts produced with each run. However for more detailed reporting on our runs we wanted the ability to layer our dbt run metadata with all the other data in our warehouse. To do that we needed to ingest the metadata for each run.

dbt audits V1: Inserting records

Our first attempt to solve the logging problem was just to include pre and post hooks in dbt that performed inserts into an audits table before and after each run on production (inspired by the event-logging package). This ensured that our models and the metadata ingested were tightly coupled, even if the run was performed by an automated process or an analyst performing an ad hoc run via the CLI.

But there were downsides:

  • The process was inefficient for the database: With about 472 models, this equated to at least 944 inserts per dbt run, significantly slowing down our dbt runs. One of the reasons for this project was to be able to prioritize models for optimization so that we could speed up our runs, so this took us further away from that goal.

  • The data was not as rich as we wanted: we were missing details about model errors, error messages, models that skipped, model tags, and more.

  • We weren’t able to capture errors: Iif the model failed, the post-hook wouldn’t execute, and we’d have no record of the model failure

dbt_project.yml


models:

project:

pre-hook:

- "{{ audit_model_logging(event_type='START') }}"

post-hook:

- "{{ audit_model_logging(event_type='END') }}"

macro.sql


{% macro audit_model_logging(event_type) %}

{% if target.name == "prod" or var('override_logging') == True %}

{% call statement('audit', fetch_result=False) %}

-- Meta insert

INSERT INTO {{target.database}}.audits.model_logging

SELECT

'{{ event_type }}',

'{{ model.name }}',

'{{ model.alias }}',

'{{ this.schema }}',

'{{ this.table }}',

'{{ model.config.materialized }}',

'{{ flags.FULL_REFRESH }}',

-- These fields should only be populated for end runs

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.table_type END),

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.is_transient END),

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.clustering_key END),

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.row_count END),

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.bytes END),

(CASE WHEN '{{ event_type }}' = 'START' THEN NULL ELSE info_schema.retention_time END),

CURRENT_TIMESTAMP,

(SELECT MAX(run_id) FROM {{ target.database }}.audits.run_logging)

-- Hack to allow for start queries if model has never been created before

FROM VALUES ('placeholder')

LEFT JOIN (

SELECT * FROM information_schema.tables

WHERE table_catalog = UPPER('{{ target.name }}')

AND table_schema = UPPER('{{ this.schema }}')

AND table_name = UPPER('{{ this.table }}')

) info_schema

ON 1 = 1

{%- endcall %}

{% endif %}

{% endmacro %}

dbt audits V2: Using run_results.json

After talking with Claire Carroll and Jeremy Cohen (teamwork!) about our process, they gave us the wonderful advice to try out using run_results.json. We moved away from individual post hooks and instead uploaded the dbt artifact to S3 after every run which would then be ingested into our warehouse with Snowpipe. Now using Dagster, it was easy to add one additional node to our DAG to run immediately after the dbt run occurred. The Dagster UI also became a better interface for analysts to perform ad hoc runs. Using presets we could define a preconfigured template for analysts to run specific models on production while still ensuring that subsequent steps such as ingesting the metadata always occurred.

presets.py


ad_hoc = PresetDefinition(

name="ad_hoc",

run_config=ad_hoc_run_config,

mode="prod",

solid_selection=["run_model+"],

)

Step 1: Upload run_results to S3

As I described above, for this step we created an extra step (or solid) in Dagster to upload run_results.json to S3 after our dbt run is complete.

  • If anyone knows how to do this with other orchestration tools, let us know in the comments.

Step 2: Load S3 files into your data warehouse (in our case, Snowflake)

Using Snowflake, this is accomplished using a Snowpipe that triggers when the new file hits our S3 bucket. The new data will append onto the table each time it’s triggered so we can retain history of our runs.

  • This should also be possible with other data warehouses, such as BigQuery and Redshift. Let us know how you accomplish this with those tools.

Step 3: Create dbt models

Once we had run_results in Snowflake, and taking advantage of Snowflake’s support for semi structured data, we could easily create a dbt model that flattens the JSON and parses out the data points we were interested in. run_results.json contains a lot of rich metadata that allows us to track model run times, errors, error messages, tags, and more!

dbt_modeling.sql


{{

config(

materialized='view'

)

}}

WITH run_results AS (

SELECT

  created_at::DATE AS run_date,

  parse_json(event):results AS contents

FROM {{ ref('run_results') }}

)

SELECT

  run_date,

  r.value:node:resource_type::STRING AS node_type,

  r.value:node:unique_id::STRING AS node_unique_id,

  r.value:node:alias::STRING AS model_name,

  r.value:node:database::STRING AS database_name,

  r.value:node:schema::STRING AS schema_name,

  r.value:node:full_refresh::STRING AS was_full_refresh,

  r.value:node:tags AS model_tags,

  r.value:node:refs AS model_references,

  r.value:node:depends_on:macros AS dependent_macros,

  r.value:node:depends_on:nodes AS dependent_nodes,

  r.value:node:config:materialized::STRING AS materialization,

  r.value:node:config:enabled::STRING AS is_enabled,

  r.value:execution_time::FLOAT AS model_run_time_sec,

  r.value:status::STRING AS run_status,

  run_status = 'ERROR' AS was_error,

  CASE

    WHEN was_error = TRUE

    THEN r.value:error::STRING

  END AS error_message,

  r.value:skip::BOOLEAN AS was_skipped,

  r.value:warn::BOOLEAN AS was_warning,

  r.value:timing[1]:started_at::TIMESTAMP AS execute_started_at,

  r.value:timing[1]:completed_at::TIMESTAMP AS execute_completed_at,

  r.value AS json_blob

FROM run_results,

  LATERAL FLATTEN(input => contents) r

WHERE r.value:node:resource_type::STRING = 'model'

Another use case for this data, is to monitor our model freshness. Either because a model did not run or cases where a model is renamed or deleted, but the table is not dropped from snowflake. We created a stale_models dbt model for this use case, which joins in the dbt_modeling model above, with some snowflake metadata from the information schema.

stale_models.sql


{{

config(

materialized='view'

)

}}

WITH model_build AS (

SELECT DISTINCT

  info.table_catalog AS database_name,

  info.table_schema AS schema_name,

  info.table_name AS table_name,

  models.execute_completed_at AS audit_timestamp,

  info.last_altered

FROM prod.information_schema.tables info --must reference PROD for accurate results

LEFT JOIN {{ ref('dbt_modeling') }} models

  ON info.table_schema = UPPER(models.schema_name)

  AND info.table_name = UPPER(models.model_name)

)

SELECT

  database_name,

  schema_name,

  table_name,

  MAX(audit_timestamp) AS model_last_built,

  MAX(last_altered) AS table_last_altered,

  DATEDIFF(DAYS, table_last_altered, NVL(model_last_built, CURRENT_DATE)) AS days_stale

FROM model_build

-- Exclude certain schemas
WHERE schema_name NOT IN (

'AUDITS',

'INFORMATION_SCHEMA'

)

GROUP BY 1,2,3

Step 4: Visualize the data/build a dashboard

To visualize this data, and make it accessible to the rest of our analytics team, we created a Looker dashboard which contains views that show our current day run stats, run errors, model run times, model run time trends, and stale models. We also complement this with some snowflake data on our dbt warehouse credit usage trends. This gives our team a one stop shop for all dbt run questions, and allows us to easily track errors as well as which models may need a reactor or optimization work. We were also able to set up alerting through the Looker dashboard, which will send an alert to our #analytics_alerts Slack channel if the model error count increases above zero, for example.

P.S. This functionality will be coming to dbt cloud! Stay tuned for updates.

19 Likes