Thank you @deydson_br , you pointed me in the right direction!
I build this macro (for Databricks/Spark) to facilitate me on storing the test query (and some outcomes). For us this works flawlessly, but how it performes for single tests is still to be tested more thoroughly. For Unit tests it was missing some meta data, therefore added the is_generic_test
condition.
{#
NOTE: Because this is a macro, the `ref` is not parsed correctly.
For this to work, we have to keep the ref statement commented in the `on-run-end` hook like so:
on-run-end:
- "/* {{ ref('tests_outcome_info') }} */ {% if var('env') == 'prod' %}{{ write_tests_outcome_info_to_table(results) }}{% else %}select 1{% endif %}"
#}
{% macro write_tests_outcome_info_to_table(results, model=ref('tests_outcome_info')) -%}
{{ log(" !! STARTING on-run-end macro to store test outcomes to a table.") }}
{%- set FAILED_RECORDS_LIMIT=20 %}
{%- set all_tests = results | selectattr("node.resource_type", "equalto", "test") | list %}
{%- if all_tests == [] %}
{# When no tests were found... #}
{# Somehow, spark sql crashes with a double /* */ comment
without a proper operation after it. Therefore adding a dummy operation.
#}
select 'Siete was here :)'
{%- else %}
INSERT INTO {{ model }} (
test_execution_id,
run_started_at,
package_name,
model,
column,
test_name,
tags,
first_arg,
status,
duration,
failures,
message,
test_query,
failed_records
)
{% for test in all_tests -%}
{%- if test.status == "fail" %}
{%- set tq = run_query("SELECT * FROM (" ~ test.node.compiled_code ~ ") WHERE 1=0") -%}
{%- set test_query_columns = tq.columns | map(attribute='name') | list -%}
{%- endif %}
{%- set tags = test.node.tags | map("replace", "'", "\\'") | list -%}
{#- Cannot test for the test.node being of type 'SingularTestNode', therefore checking test_metadata which seems to be missing for Single tests. #}
{#- The other type is the 'GenericTestNode' #}
{%- set is_generic_node = test.node.test_metadata is defined -%}
{{- log("-------------------THE TEST DATA OBJECT!!!!! " ~ test.status ~ " -------------------") -}}
{{- log(test) -}}
{{- log("---------------------------- END ------------------------------") -}}
SELECT
md5('{{ invocation_id }}-{{ test.node.unique_id }}') AS test_execution_id,
'{{ run_started_at }}' AS run_started_at,
'{{ test.node.package_name }}' AS package_name,
{%- if is_generic_node %}
'{{ test.node.file_key_name.split('.')[-1] }}'
{%- else %}
'{{ test.node.refs[0].name }}'
{%- endif %} AS model,
{% if test.node.column_name %}'{{ test.node.column_name }}'{% else %}NULL{% endif %} AS column,
{% if is_generic_node %}'{{ test.node.test_metadata.name }}'{% else %}'{{ tags | first }}'{% endif %} AS test_name,
ARRAY(
{%- for tag in tags -%}
{% if not loop.first %}, {% endif %}'{{ tag }}'
{%- endfor -%}
) AS tags,
{%- if is_generic_node %}
{%- set first_key = test.node.test_metadata['kwargs'] | list | first or None %}
{%- if first_key %}
'{{first_key}}={{ test.node.test_metadata.kwargs[first_key] | string | replace("'", "\\'") }}'
{%- else %}
NULL
{%- endif %}
{%- else %}
'test_path={{ test.node.unique_id }}'
{%- endif %} AS first_arg,
'{{ test.status }}' AS status,
'{{ test.execution_time }}' AS duration,
{% if test.failures %}{{test.failures}}{% else %}NULL{% endif %} AS failures,
'{{ (test.message or test.adapter_response._message) | replace("'", "\\'") }}' AS message,
'{{ test.node.compiled_code | replace("'", "\\'") | trim }}' AS test_query,
{%- if test.status != "fail" or not is_generic_node %}
NULL AS failed_records {#- Test pass/error OR a unit-test #}
{%- else %}
to_json(collect_list(struct(subquery.*))) AS failed_records {#- Test fail AND not unit-test #}
FROM (
SELECT
*
FROM ( {#- Since this is dynamic, there is nothing to order on, and row_number requires that. luckily, uuid works! #}
SELECT
row_number() OVER (order by uuid()) as row_nr
{% for col in test_query_columns -%}
, {{ col }} :: STRING
{% endfor -%}
FROM (
{{test.node.compiled_code | indent(16)}}
) limit {{ FAILED_RECORDS_LIMIT }}
)
UNPIVOT INCLUDE NULLS (
`value` FOR `column_name` IN (
{%- for col in test_query_columns -%}
{{ col }}
{%- if not loop.last%}, {% endif -%}
{% endfor -%}
)
)
) subquery
{% endif %} {#- End of "test failed" if condition #}
{% if not loop.last %}UNION ALL
{% endif %}
{% endfor %}
{%- endif %}
{%- endmacro %}
It requires a separate incremental model:
{#
We cant use a variable in the prehook.
Also note that the log will be logged twice, but only ran once.
#}
{{ config(
post_hook="
{{ log('!! Deleting records from 90 days ago.') }}
DELETE FROM {{ this }}
WHERE run_started_at < date_add(now(), -90)
"
) }}
SELECT
null::STRING as test_execution_id,
null::TIMESTAMP as run_started_at,
null::STRING as package_name,
null::STRING as model,
null::STRING as column,
null::STRING as test_name,
null::ARRAY<STRING> as tags,
null::STRING as first_arg,
null::STRING as status,
null::DOUBLE as duration,
null::INT as failures,
null::STRING as message,
null::STRING as test_query,
null::STRING as failed_records
WHERE
1=0
And you can call it like this:
on-run-end:
- "/* {{ ref('tests_outcome_info') }} */ {% if var('env') == 'prod' %}{{ write_tests_outcome_info_to_table(results) }}{% else %}select 1{% endif %}"