Hello! I recently read How to Build a Production Grade Workflow with SQL Modelling — Data Science & Engineering which describes Shopify’s approach to testing in dbt.
At my day job, we’ve been using dbt for a couple of months now. We’ve been planning on providing some sort of targeted “unit” testing framework, which allows users to validate an individual model’s logic. I took a stab at it this weekend and wanted to solicit some feedback on the approach. This is packaged as a dbt package, but it’s not a package announcement. This is a barely working testing framework that just supports postgres
I defined the following design constraints:
- Tests must be written as python tests (your standad test_*.py files, using python test classes). This allows end users to choose their python framework and reporting methods. It should be very familiar to anyone using python.
- Tests should support assertions using pandas dataframes. Pandas makes it easy to create relational constructs, and to transforms dataframes into sql tables and vice-a-versa
- Tests should provide any easy way to mock model dependencies, i.e. ref and source. Tests should only focus on a single model’s transformations and not a dependency graph of models.
- The test strategy should not require model definitions to change in any way. This means no conditionals, or special calls. The test framework should evaluate the model as close as possible to the definition that will be running in production.
The POC framework is available here:
Following shows an MVP of usage.
The POC initialized a new dbt project. I then created 2 tests, one for each of the “example” models. Each of the tests live alongside the models:
models/example/
my_first_dbt_model.sql
my_second_dbt_model.sql
test_my_first_dbt_model.py
test_my_second_dbt_model.py
The first example model looks like:
# my_first_dbt_model.sql
{{ config(materialized='table') }}
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
Notice that it contains no dependencies (no refs or sources).
This is a pretty simple case. The test framework uses dbts own “adapters” to get a connection to the database specified in the config file.
Currently, it assumes a static config file used for testing, located in the conf/ directory.
The lack of dependencies makes this test really simple:
import pandas as pd
from dbtmodeltest.testcase import DBTModelTestCase
class MyFirstDBTModelTestCase(DBTModelTestCase):
def test_output_rows(self):
df = pd.DataFrame([
(1,),
(None,)
], columns=['id'])
out_df = self.execute_model(
model='models/example/my_first_dbt_model.sql',
)
self.assertDFEqual(df, out_df)
$ python -m unittest models/example/test_my_first_dbt_model.py
Running with dbt=0.19.0
Found 2 models, 4 tests, 0 snapshots, 0 analyses, 139 macros, 0 operations, 0 seed files, 0 sources, 0 exposures
15:27:58 | Concurrency: 8 threads (target='ci')
15:27:58 |
15:27:58 | 1 of 1 START table model test.my_first_dbt_model..................... [RUN]
15:27:58 | 1 of 1 OK created table model test.my_first_dbt_model................ [SELECT 2 in 0.10s]
15:27:58 |
15:27:58 | Finished running 1 table model in 0.20s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
"test"."test"."my_first_dbt_model"
id
0 1.0
1 NaN
.
----------------------------------------------------------------------
Ran 1 test in 1.183s
OK
The second test gets much more complicated.
The second test exercises the my_second_dbt_model.sql
which looks like:
select *
from {{ ref('my_first_dbt_model') }}
where id = 1
The test framework needs to hook into the ref
tag and replace it with a pandas dataframe. The test for this looks like:
import pandas as pd
from dbtmodeltest.testcase import DBTModelTestCase
class MyFirstDBTModelTestCase(DBTModelTestCase):
def test_only_returns_id_1(self):
my_first_dbt_model = pd.DataFrame([
(1,),
(2,),
(None,)
], columns=['id'])
out_df = self.execute_model_with_refs(
model='models/example/my_second_dbt_model.sql',
my_first_dbt_model=my_first_dbt_model,
)
expected_df = pd.DataFrame([
(1,),
], columns=['id'])
self.assertDFEqual(expected_df, out_df)
This shows the output from the test execution:
$ DBT_MODEL_TEST_ENABLED=1 DBT_MODEL_TEST_IDENTIFIER_PREFIX="test1_" python -m unittest models/example/test_my_second_dbt_model.py
creating table: test1_my_first_dbt_model
Running with dbt=0.19.0
Found 2 models, 4 tests, 0 snapshots, 0 analyses, 139 macros, 0 operations, 0 seed files, 0 sources, 0 exposures
15:32:36 | Concurrency: 8 threads (target='ci')
15:32:36 |
15:32:36 | 1 of 1 START view model test.my_second_dbt_model..................... [RUN]
15:32:36 | 1 of 1 OK created view model test.my_second_dbt_model................ [CREATE VIEW in 0.09s]
15:32:36 |
15:32:36 | Finished running 1 view model in 0.18s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
"test"."test"."my_second_dbt_model"
id
0 1
.
----------------------------------------------------------------------
Ran 1 test in 1.137s
OK
The strategy for replacing “ref” with custom data leverages a custom macro. This macro prefixes the table with a prefix defined in the environment:
{% macro ref(model_name) %}
{% set dbt_model_test_enabled = env_var('DBT_MODEL_TEST_ENABLED', False) %}
{# set dbt_model_test_database = env_var('DBT_MODEL_TEST_DATABASE', '') #}
{# set dbt_model_test_schema = env_var('DBT_MODEL_TEST_SCHEMA', '') #}
{% set dbt_model_test_identifier_prefix = env_var('DBT_MODEL_TEST_IDENTIFIER_PREFIX', '') %}
{{ log("Running custom:ref " ~ model_name) }}
{% if dbt_model_test_enabled == '1' %}
{{ log("DBT_MODEL_TEST: model test enabled") }}
{% set rel = builtins.ref(model_name) %}
{%
set newrel = rel.replace_path(
identifier=dbt_model_test_identifier_prefix + model_name
)
%}
{% do return(newrel) %}
{% else %}
{{ log("DBT_MODEL_TEST: model test disabled") }}
{% do return(builtins.ref(model_name)) %}
{% endif %}
{% endmacro %}
This points our dbt model to the prefixed test table. The final step is to create the prefixed test table using the dataframe provide.
engine = create_engine(self._adapter_sqlalchemy_conn_string())
with engine.connect() as conn:
for ref_name, ref_df in ref_dfs.items():
table_name = self.identifier_prefix + ref_name
# drop table and cascade if exists...
conn.execute('DROP TABLE {} CASCADE'.format(table_name))
print('creating table: {}'.format(table_name))
ref_df.to_sql(
name=table_name,
con=engine,
if_exists='replace',
index=False,
)
return self.execute_model(model)
This is accomplished using sql alchemy to convert the dataframe to a table using to_sql(...)
.
I’m wondering:
- Does replacing ref/source with a custom macro make sense?
- Has anyone else done this?
- If so, which strategy did you use?
I appreciate any feedback on this approach. Thank you.