Auto pivot JSON string using schema.yml in BigQuery

Let me start by giving you some background and apologies if the background is longer than the solution.

Background

The scenario:

  • We work with BigQuery
  • Working with events data split into two data sources due to the constraints of the source system
  • One source is the main set of data, let us call it TABLE_MAIN
  • The other is the ancillary or supportive data, let us call this one TABLE_SUPPORT
  • There is a possibility of a 1:n relationship between TABLE_MAIN and TABLE_SUPPORT
  • Join these two tables into one

Simple enough? Not so much. The complication is data in TABLE_SUPPORT. Why so?

  • Not all events have the same support data, one event can have few columns and others can have hundreds.
    • Column one of events evt_123 is not necessarily the same as column one of event evt_456
    • A lot of empty columns if all columns are mapped to each event
  • Constraints of the source system
    • Only 20 columns of data per row. Multiple rows per event
    • Only 60 characters per column so one value of support data can span multiple columns of TABLE_SUPPORT. And if this happens on column 20 then one value of support data can span two rows.
  • The source system can be enhanced with additional support data at any time

To help visualize:
TABLE_MAIN

Field Key
EVENTGUID_1 X
EVENTGUID_2 X
PROCESS_DATE X

TABLE_SUPPORT

Field Key Notes
EVENTGUID_1 X
EVENTGUID_2 X
PROCESS_DATE X
RECSEQ X Row sequence number if support data is across multiple rows
FIELDNAME 01 to 20 Name
FIELDVALSEQ 01 to 20 Sequence if value has been split
FIELDVALUE 01 to 20 Value

Got that? In the end, even though TABLE_SUPPORT has multiple rows for each event, it is only really one row’s worth of data per event. Which means that a 1:1 relationship between TABLE_MAIN and TABLE_SUPPORT is possible.

The dbt Model:

I won’t go too much into how the mapping between the tables was done, suffice so say that arrays played a part. Basically it is this:

  • Get TABLE_MAIN data
  • Get TABLE_SUPPORT data
    • Create ARRAY
    • STRING_AGG ordering by FIELDVALSEQ
    • ARRAY_AGG(STRUCT(FIELDNAME, FIELDVALUE)
  • Join
  • Tadah! You have TABLE_COMBINED with an array, which is an awesome feature of BQ (I’ve got a SAPBW background, no arrays there).

However, because of the data volume you end up with TABLE_COMBINED being of similar size to TABLE_SUPPORT and any queries on table TABLE_COMBINED will be pricey. An array is not suitable so as a final step we turn the array into a JSON string using TO_JSON_STRING(). Table size drops dramatically

Great, now we have a usable table with a JSON string containing all my event support data.

How do I get the data out of this JSON string when querying the table? Easy, just use JSON_EXTRACT_SCALAR(json_string,'$.fieldname') as fieldname. Well easy if you are versed or comfortable with SQL and if you know what you are looking. But if you are a general user, the “select * and I’ll see what’s there” kind of user? Our solution to create a view with the JSON string automatically pivoted.

Pivot Solution

You may say easy, we all know what dbt allows us to auto-generate a repeated set of code using macros. And you are right, just:

  • Create a macro
  • Put the JSON extract command in a FOR loop with a field list,
  • Call the macro and provide the field list

But consider these questions:

  • What if I do not want to manually provide field list?
  • What if I do not want developer changing my model every time the source system is enhanced, especially if the model itself does not really change?
  • What I need to make sure the correct data type is used as the end result of JSON_EXTRACT_SCALAR() is always a string?
  • When should CAST() be used or DATE() be used instead of PARSE_DATE() etc.

Here is how I did it:

schema.yml

My first thought was the schema.yml for each model. This document needs to be updated every time the model gets updated. And I would rather it be changed that a model, less likelihood of breaking things. Is it possible to use the schema to drive the pivoting and the associated rules?

One of the options in the schema is meta. I ask the question in slack about its use and Taylor Murphy’s response helped me greatly. I decided to use the meta option and add three values. Note that this is the target table/view schema.

  • JSON: False or True – Mandatory for columns in JSON string
  • data_type: DATE
  • rule: Parse – rule applied to data_type. Blank = DATE(), parse = PARSE_DATE() etc. more rules can be added if required.

Macro

Now that I had my driver my macro just needed to the following, with a lot of if statements:

  • Get a list of columns from the schema
  • Loop through columns.
    • Get JSON string columns only
    • Extract based on Data Type
    • Get conversion rule if column is a Time Dimension

I tried to keep the macro dynamic, not limited to my model or the name of the JSON string.

Model

The hard part is done. Now I just need to call the macro passing the right arguments and it is done.

Recap

The schema.yml file is critical. All developer now needs to do is make sure that any additional support data from the source system is added to the proper schema.yml, the macro will do the rest.

I am thinking that the meta option in schema will be used more in future.
Hope this was helpful.

For reference:

Sample from schema.yml

      - name: CALL_AHEAD_DATE 
        description: Call Ahead Date
        meta:
          JSON: True
          data_type: DATE
          rule: Parse

My macro:

{%- macro json_extract_by_datetype(json_string, model_name, node_type='model', project) -%}
    {%- if execute -%}
        {# Step 1 - Get a list of columns from the schema #}
            {# Step 2 - Loop through columns, get JSON string columns only #}
                {# Step 3 - Extract based on Data Type #}
                    {# Step 4 - Get conversion rule if column is a Time Dimension #}
        {%- set anc_meta_key = 'JSON' -%}
        {%- set rule_meta_key = 'rule' -%}
        {%- set datatype_meta_key = 'data_type' -%}
        {%- set columns = [] -%}
        {%- set fqname = node_type ~ '.' ~ project ~ '.' ~ model_name -%}
        {%- set columns = graph.nodes[fqname]['columns']  -%}

        {# Step 1 - Get JSON string Columns only #}
            {# Step 2 - Extract based on Data Type #}
                {# Step 3 - Get Conversion Rule if column is a Time Dimension #}
        {%- for column in columns -%}
            {%- set anc_value = (graph.nodes[fqname]['columns'][column]['meta'][anc_meta_key]|upper) -%}
            {%- set rule_value = (graph.nodes[fqname]['columns'][column]['meta'][rule_meta_key]|upper) -%}
            {%- set dt_value = (graph.nodes[fqname]['columns'][column]['meta'][datatype_meta_key]|upper) -%}
            {%- if anc_value == 'TRUE' -%}
                {%- if dt_value == 'STRING' -%}
                    JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}') as {{column}},
                {%- elif dt_value == 'NUMERIC' -%}  
                    safe_cast(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}') as NUMERIC) as {{column}},
                {%- elif dt_value == 'INTEGER' -%}  
                    safe_cast(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}') as INT64) as {{column}},
                {%- elif dt_value == 'FLOAT' -%}  
                    safe_cast(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}') as FLOAT64) as {{column}},
                {%- elif dt_value == 'DATE' -%}  
                    {%- if rule_value == 'PARSE' -%}
                        parse_date('%Y%m%d',JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- else -%}
                        date(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- endif -%}
                {%- elif dt_value == 'TIME' -%}  
                    {%- if rule_value == 'PARSE' -%}
                        parse_time('%H%M%S',JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- else -%}
                        time(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- endif -%}
                {%- elif dt_value == 'DATETIME' -%}  
                    {%- if rule_value == 'PARSE' -%}
                        parse_datetime('%Y%m%d%H%M%S',JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- else -%}
                        datetime(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- endif -%}
                {%- elif dt_value == 'TIMESTAMP' -%}  
                    {%- if rule_value == 'PARSE' -%}
                        parse_timestamp('%c',JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- else -%}
                        timestamp(JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}')) as {{column}},
                    {%- endif -%}
                {%- else -%}
                    JSON_EXTRACT_SCALAR({{json_string}},'$.{{column}}') as {{column}},
                {%- endif -%}
            {%-else -%}
            {%- endif -%}
        {%- endfor -%} 
    {%- endif -%}
{%- endmacro -%}

Sample model:

{{config
  (
    tags         = ['testing'],
    labels       = {'created_by': 'dbt-generated'},
    materialized = 'view',
    schema       = 'dbt_demo',
    alias        = 'test_Pivot'
   )
}}

select   *
  except(JSON_STRING),  
  {{json_extract_by_datetype('JSON_STRING','my_target_model_name', 'model', 'dbt_project_name')}}
from {{ ref('my_model_name') }}     
where process_date is not null
3 Likes