Single pass multi field json extraction with BigQuery and dbt (huge performance increase)

tldr; this pattern significantly boosts the performance of extracting multiple fields from a single JSON field in BigQuery using UDFs generated by dbt macros. During benchmarking, we observed a 150% performance increase while extracting 40 fields.

BigQuery has two built in options for extracting json from fields JSON_EXTRACT & JSON_EXTRACT_SCALAR. These functions are inefficient at extracting multiple values from a single json field as the json field is parsed once for each value extracted. Performance becomes exponentially worse as more columns are extracted because both the json field’s length and the number of JSON_EXTRACT_SCALAR calls increase.

Common approach:

{{ config(materialized="ephemeral") }}

WITH
  bq_table_with_json_field AS (
    SELECT
      '{"foo": "abc", "bar": 42, "baz": true, "qux": [3.1, 3.14, 3.142]}' AS json_field
  )

SELECT
  JSON_EXTRACT_SCALAR(json_field, '$.foo') AS foo,
  JSON_EXTRACT_SCALAR(json_field, '$.bar') AS bar,
  JSON_EXTRACT_SCALAR(json_field, '$.baz') AS baz,
  JSON_EXTRACT_SCALAR(json_field, '$.qux') AS qux
FROM bq_table_with_json_field

Using dbt we can programatically generate a BigQuery Javascript UDF which parses the json field once to extract all the required fields. BigQuery Javascript UDFs are unable to return STRUCT objects but they can return ARRAY<STRUCT<key STRING, value STRING>> which has a deterministic order. We can also use the column’s data_type from the schema.yml to automatically cast the values. Here is an example implementation.

macros/bq_json_extractor.sql

{% macro extract_payload_udf(fields) %}
    {% raw %}
        CREATE TEMP FUNCTION jsonToMap(json_string STRING) 
        RETURNS ARRAY<STRUCT<key STRING, value STRING>>
        LANGUAGE js 
        AS """
        var parsed_json = JSON.parse(json_string);
        var fields = {% endraw %}{{ fields | tojson }}{% raw %};
        var mapResult = [];
        for (let field of fields) {
            if (parsed_json.hasOwnProperty(field)) {
                mapResult.push({'key': field, 'value': parsed_json[field]});
            } else {
                mapResult.push({'key': field, 'value': null});
            }
        }
        return mapResult;
        """;
    {% endraw %}
{% endmacro%}

{% macro json_columns(field_type_map) %}
 {% for type, field in field_type_map %}
    {% if type == 'TIMESTAMP' %}
        SAFE_CAST(mapped_json_field[ORDINAL({{ loop.index }})].value AS TIMESTAMP) AS {{ field }},
    {% elif type == 'STRING' %}
        mapped_json_field[ORDINAL({{ loop.index }})].value AS {{ field }},
    {% elif type == 'INT64' %}
        SAFE_CAST(mapped_json_field[ORDINAL({{ loop.index }})].value AS INT64) AS {{ field }},
    {% elif type == 'FLOAT64' %}
        SAFE_CAST(mapped_json_field[ORDINAL({{ loop.index }})].value AS FLOAT64) AS {{ field }},
    {% elif type == 'BOOLEAN' %}
        SAFE_CAST(mapped_json_field[ORDINAL({{ loop.index }})].value AS BOOLEAN) AS {{ field }},
    {% elif type == 'ARRAY(STRING)' %}
        (SELECT ARRAY_AGG(item IGNORE NULLS) FROM UNNEST(SPLIT(mapped_json_field[ORDINAL({{ loop.index }})].value, ",")) AS item) AS {{ field }},
    {% elif type == 'ARRAY(INT64)' %}
        (SELECT ARRAY_AGG(SAFE_CAST(item AS INT64) IGNORE NULLS) FROM UNNEST(SPLIT(mapped_json_field[ORDINAL({{ loop.index }})].value, ",")) AS item) AS {{ field }},
    {% elif type == 'ARRAY(FLOAT64)' %}
        (SELECT ARRAY_AGG(SAFE_CAST(item AS FLOAT64) IGNORE NULLS) FROM UNNEST(SPLIT(mapped_json_field[ORDINAL({{ loop.index }})].value, ",")) AS item) AS {{ field }},
    {% elif type == 'ARRAY(BOOLEAN)' %}
        (SELECT ARRAY_AGG(SAFE_CAST(item AS BOOLEAN) IGNORE NULLS) FROM UNNEST(SPLIT(mapped_json_field[ORDINAL({{ loop.index }})].value, ",")) AS item) AS {{ field }},
    {% else %}
        {{ exceptions.raise_compiler_error("Unsupported field type: " ~ type) }}
    {% endif %}
  {% endfor %}
{% endmacro%}

{% macro map_field_types(schema) %}
  {%- set result = [] -%}
  {%- for field_name, field_attributes in schema.items() -%}
    {%- if field_name != 'event_timestamp' -%}
      {%- do result.append([field_attributes['data_type'], field_name]) -%}
    {%- endif -%}
  {%- endfor -%}
  {{- return(result) -}}
{% endmacro%}

{% macro map_fields(schema) %}
  {%- set result = [] -%}
  {%- for field_name in schema.keys() -%}
    {%- if field_name != 'event_timestamp' -%}
      {%- do result.append(field_name) -%}
    {%- endif -%}
  {%- endfor -%}
  {{- return(result) -}}
{% endmacro%}

models/sandbox/schema.yml

version: 2
models:
- name: bq_table_with_json_field
  description: ''
  columns:
    - name: foo
      data_type: STRING
    - name: bar
      data_type: INT64
    - name: baz
      data_type: BOOLEAN
    - name: qux
      data_type: ARRAY(FLOAT64)

models/sandbox/bq_table_with_json_field.sql

{%- set field_type_map = map_field_types(model['columns']) -%}
{%- set fields = map_fields(model['columns']) -%}

{{ extract_payload_udf(fields) }}

{{ config(materialized="ephemeral") }}

WITH
  bq_table_with_json_field AS (
    SELECT
      '{"foo": "abc", "bar": 42, "baz": true, "qux": [3.1, 3.14, 3.142]}' AS json_field
  ),
  table_with_mapped_json_field AS (
  SELECT
    jsonToMap(json_field) AS mapped_json_field
  FROM bq_table_with_json_field
  )

SELECT
  {{ json_columns(field_type_map) }}
FROM table_with_mapped_json_field

Which compiles to

CREATE TEMP FUNCTION
  jsonToMap(json_string STRING)
  RETURNS ARRAY<STRUCT<KEY STRING,
  value STRING>>
  LANGUAGE js AS """
        var parsed_json = JSON.parse(json_string);
        var fields = ["foo", "bar", "baz", "qux"];
        var mapResult = [];
        for (let field of fields) {
            if (parsed_json.hasOwnProperty(field)) {
                mapResult.push({'key': field, 'value': parsed_json[field]});
            } else {
                mapResult.push({'key': field, 'value': null});
            }
        }
        return mapResult;
        """;
WITH
  bq_table_with_json_field AS (
  SELECT
    '{"foo": "abc", "bar": 42, "baz": true, "qux": [3.1, 3.14, 3.142]}' AS json_field ),
  table_with_mapped_json_field AS (
  SELECT
    jsonToMap(json_field) AS mapped_json_field
  FROM
    bq_table_with_json_field )
SELECT
  mapped_json_field[ORDINAL(1)].value AS foo,
  SAFE_CAST(mapped_json_field[ORDINAL(2)].value AS INT64) AS bar,
  SAFE_CAST(mapped_json_field[ORDINAL(3)].value AS BOOLEAN) AS baz,
  (
  SELECT
    ARRAY_AGG(SAFE_CAST(item AS FLOAT64) IGNORE NULLS)
  FROM
    UNNEST(SPLIT(mapped_json_field[ORDINAL(4)].value, ",")) AS item) AS qux,
FROM
  table_with_mapped_json_field

Enjoy and please let me know if you notice any other efficiency improvements I have overlooked with this approach.

2 Likes