How can I do loop inside loop and commit for each iteration?

The problem I’m having:

I’m working with multi-tenant Snowflake tables and have a dbt project that requires executing nested loops with commits at each iteration.

The outer loop processes each customer, while the inner loop runs logic over a range of dates that are unique for each customer based on their data.

After committing the results for the first day, I need to use the data I created in the next iteration for the same customer.

However, from what I’ve seen, dbt doesn’t support this kind of control flow or commit behavior natively.

Do you know of a better solution or recommended approach to achieve this within dbt?

What I’ve already tried

As a workaround, I wrote a stored procedure to handle it, but I understand that this isn’t considered best practice in dbt.

Some example code or error messages

CREATE OR REPLACE PROCEDURE DMS_PROD_DB.STG.SP_CHARGER_STATUS()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS '
    DECLARE
        current_db STRING := CURRENT_DATABASE();
        current_sch STRING := CURRENT_SCHEMA();
        analytics_dim_schema STRING;
        stg_schema STRING;
        result_msg STRING := '''';
        -- Client processing variables
        current_client_id STRING;
        current_client_name STRING;
        start_date DATE;
        end_date DATE;
        current_date DATE;
        date_count INTEGER := 0;
        client_count INTEGER := 0;
        total_clients INTEGER := 0;
        records_processed INTEGER := 0;
        total_records INTEGER := 0;
    BEGIN
            LET client_query STRING := ''
                WITH ChargerStatus AS (
                    SELECT 
                        UNIQUE_ID,
                        MAX(STATUS_DATE) AS max_status_date
                    FROM '' || stg_schema || ''.CHARGER_STATUS_STG
                    GROUP BY UNIQUE_ID
                ),
                MinChangedOn AS (
                    SELECT 
                        UNIQUE_ID,
                        MIN(COALESCE(changed_on_base_line,changed_on))::DATE AS min_changed_on
                    FROM '' || analytics_dim_schema || ''.FACT_STATION_STATUS_CHANGE_LOG
                    WHERE (CHANGED_ON::DATE >= ''''2024-07-01'''' OR CHANGED_ON_BASE_LINE::DATE >= ''''2024-07-01'''')
                    GROUP BY UNIQUE_ID
                ),
                LastRun AS (
                    SELECT DISTINCT
                        TENANT_ID AS UNIQUE_ID,
                        dateadd(day,-1,LAST_RUN_DATE) AS LAST_RUN_DATE,
                        CLIENT_NAME
                    FROM '' || analytics_dim_schema || ''.DIM_SOURCE
                    WHERE TABLE_NAME = ''''STATION_STATUS_CHANGE_LOG'''' 
                      AND NOT is_deleted
                      AND NOT is_suspended 
                      AND last_run_date = current_date
                )
                SELECT 
                    f.UNIQUE_ID,
                    COALESCE(lr.CLIENT_NAME, ''''Unknown'''') AS CLIENT_NAME,
                    COALESCE(cs.max_status_date,f.min_changed_on) AS START_DATE,
                    lr.LAST_RUN_DATE AS END_DATE
                FROM MinChangedOn f
                INNER JOIN LastRun lr 
                    ON lr.UNIQUE_ID = f.UNIQUE_ID
                LEFT JOIN ChargerStatus cs 
                    ON cs.UNIQUE_ID = f.UNIQUE_ID
                ORDER BY f.UNIQUE_ID'';
            
            LET client_result RESULTSET := (EXECUTE IMMEDIATE client_query);
            LET found_clients BOOLEAN := FALSE;

            FOR client_record IN client_result DO
                found_clients := TRUE;
                current_client_id := client_record.UNIQUE_ID;
                current_client_name := client_record.CLIENT_NAME;
                start_date := client_record.START_DATE;
                end_date := client_record.END_DATE;
                client_count := client_count + 1;
                result_msg := result_msg || ''Client '' || client_count || '': '' || current_client_name || '' ('' || current_client_id || '')
'';
                result_msg := result_msg || ''Date range: '' || start_date || '' to '' || end_date || ''
'';
                date_count := 0;
                current_date := start_date;
                WHILE (current_date <= end_date) DO
                    date_count := date_count + 1;
                    
                    BEGIN
                        LET process_query STRING := ''
                            INSERT INTO '' || stg_schema || ''.CHARGER_STATUS_STG (
                                charger_gk, unique_id, status_date, socket_id, station_id,
                                available, charging, faulted, unavailable, new, unknown,
                                reserved, discharging, paused, preparing, finishing,
                                in_maintenance, max_status_value, managed, client_name, client_region,
                                filename, row_number_in_file, row_insert_date, row_update_date,
                                row_from_bucket_name, row_update_by_user
                            )
                            WITH cte_station_socket_source as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_CONNECTOR    
                                WHERE unique_id = '''''' || current_client_id || '''''' 
                            ),
                            cte_station_source as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_CHARGER 
                                WHERE unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_station_owner_data_source as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_CHARGER_OWNER 
                                WHERE unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_configuration_source as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_CONFIGURATION 
                                WHERE param_name = ''''stationServerSocketBlocksChargingSocketStatus'''' AND unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_dic_socket_status_source as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_CONNECTOR_STATUS
                                WHERE unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_station_status_change_log_source as (
                                SELECT 
                                    unique_id, id, station_id, station_status_id, 
                                    COALESCE(changed_on_base_line,changed_on) AS CHANGED_ON, 
                                    offline, old_station_status_id, station_socket_id, 
 changed_on as original_changed_on
                                FROM '' || analytics_dim_schema || ''.FACT_STATION_STATUS_CHANGE_LOG a
                                WHERE ( changed_on::DATE              = '''''' || current_date || ''''''  OR
                                        changed_on_base_line::DATE = '''''' || current_date || ''''''  
                                      )
                                    AND STATION_SOCKET_ID IS NOT NULL
                                    AND unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_station_charge_transaction_source as (
                                SELECT 
                                    unique_id, id, station_socket_id, start_card_id, stop_card_id, 
                                    COALESCE(start_on_base_line, start_on) AS START_ON, stopped_on, 
                                    meter_read_on_start, meter_read_on_stop, 
                                FROM '' || analytics_dim_schema || ''.FACT_STATION_CHARGE_TRANSACTION
                                WHERE ( start_on::DATE           = '''''' || current_date || '''''' OR
                                        start_on_base_line::DATE = '''''' || current_date || ''''''
                                      )
                                    AND unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_dim_station_history_provision as (
                                SELECT * 
                                FROM '' || analytics_dim_schema || ''.DIM_STATION_HISTORY_PROVISION
                                WHERE unique_id = '''''' || current_client_id || ''''''
                            ),
           cte_station_status_change_log_source_prev_changed_on as (
                                SELECT a.*,
                                    COALESCE(
                                        LAG(a.changed_on) OVER (PARTITION BY a.unique_id, a.station_socket_id ORDER BY a.changed_on), 
                                        a.changed_on::date
                                    ) AS previous_changed_on
                                FROM cte_station_status_change_log_source a
                                INNER JOIN cte_dim_station_history_provision b
                                    ON  a.unique_id  = b.unique_id AND
                                        a.station_id = b.charger_id AND
                                        (a.changed_on_base_line between b.from_date_time and coalesce(b.to_date_time, CURRENT_TIMESTAMP)
                                        OR
                                        a.original_changed_on between b.from_date_time and coalesce(b.to_date_time, CURRENT_TIMESTAMP))
                                --- start change
                            ),
                            cte_filtered_sockets AS (
                                SELECT ss.unique_id, ss.connector_id, ss.charger_id, 
                                    s.usage_start_date, ss.evse_identity_key,
                                    COALESCE(s.deleted,0) AS DELETED, s.provisioning_date, s.managed
                                FROM cte_station_socket_source ss
                                INNER JOIN cte_station_source s 
                                    ON ss.charger_id = s.charger_id AND ss.unique_id = s.unique_id
                                INNER JOIN cte_station_owner_data_source so 
                                    ON so.charger_owner_id = s.station_owner_id AND so.unique_id = s.unique_id
                                INNER JOIN cte_dim_station_history_provision a
                                    ON  a.unique_id  = ss.unique_id AND
                                        a.charger_id = ss.charger_id AND
                                        '''''' || current_date || '''''' between a.from_date::date and coalesce(a.TO_DATE, CURRENT_TIMESTAMP::DATE)
                                WHERE
                                '''''' || current_date || ''''''  <= coalesce(ss.is_deleted_date,CURRENT_TIMESTAMP)::date  
                                AND
                                (   coalesce(s.deleted,0) = 0 
                                    or
                                    (coalesce(s.deleted,0) = 1 and '''''' || current_date || '''''' <= s.modified_on::date)
                                )
                            ),
                            cte_station_sockets as (
                                SELECT
                                    stg_y.unique_id AS UNIQUE_ID,
                                    '''''' || current_date || '''''' AS STATUS_DATE,
                                    stg_y.socket_id AS SOCKET_ID,
                                    stg_y.station_id AS STATION_ID,
                                    CASE WHEN stg_y.max_status_value = ''''AVAILABLE'''' THEN 86400 ELSE 0 END AS AVAILABLE,
                                    CASE WHEN stg_y.max_status_value = ''''CHARGING'''' THEN 86400 ELSE 0 END AS CHARGING,
                                    CASE WHEN stg_y.max_status_value = ''''FAULTED'''' THEN 86400 ELSE 0 END AS FAULTED,
                                    CASE WHEN stg_y.max_status_value = ''''UNAVAILABLE'''' THEN 86400 ELSE 0 END AS UNAVAILABLE,
                                    CASE WHEN stg_y.max_status_value = ''''NEW'''' THEN 86400 ELSE 0 END AS NEW,
                                    CASE WHEN stg_y.max_status_value = ''''UNKNOWN'''' THEN 86400 ELSE 0 END AS UNKNOWN,
                                    CASE WHEN stg_y.max_status_value = ''''RESERVED'''' THEN 86400 ELSE 0 END AS RESERVED,
                                    CASE WHEN stg_y.max_status_value = ''''DISCHARGING'''' THEN 86400 ELSE 0 END AS DISCHARGING,
                                    CASE WHEN stg_y.max_status_value = ''''PAUSED'''' THEN 86400 ELSE 0 END AS PAUSED,
                                    CASE WHEN stg_y.max_status_value = ''''PREPARING'''' THEN 86400 ELSE 0 END AS PREPARING,
                                    CASE WHEN stg_y.max_status_value = ''''FINISHING'''' THEN 86400 ELSE 0 END AS FINISHING,
                                    stg_y.in_maintenance AS IN_MAINTENANCE,
                                    stg_y.max_status_value AS MAX_STATUS_VALUE, 
                                    stg_y.managed AS MANAGED,
                                    stg_y.client_name, stg_y.client_region, stg_y.filename,
                                    stg_y.row_number_in_file, stg_y.row_insert_date, stg_y.row_update_date,
                                    stg_y.row_from_bucket_name, stg_y.row_update_by_user
                                FROM '' || stg_schema || ''.CHARGER_STATUS_STG stg_y
                                INNER JOIN cte_filtered_sockets s
                                    ON s.connector_id = stg_y.socket_id AND s.unique_id = stg_y.unique_id
                                WHERE stg_y.status_date = DATEADD(day,-1, '''''' || current_date || '''''')
                                  /* AND s.usage_start_date::DATE <= stg_y.status_date */
                                  AND stg_y.unique_id = '''''' || current_client_id || ''''''
                            ),
                            cte_socket_block_status as (
                                SELECT c.unique_id AS UNIQUE_ID,
                                    COALESCE(dss.connector_status_id, 3) AS BLOCK_STATUS_ID
                                FROM cte_configuration_source c
                                INNER JOIN cte_dic_socket_status_source dss 
                                    ON dss.connector_status_value = c.param_value AND dss.unique_id = c.unique_id
                            ),
                            cte_max_change as (
                                SELECT cl.unique_id AS UNIQUE_ID, 
                                    cl.station_socket_id AS STATION_SOCKET_ID, 
                                    MAX(changed_on) AS MAX_CHANGED_ON
                                FROM cte_station_status_change_log_source cl
                                WHERE CAST(cl.changed_on AS DATE) = '''''' || current_date || ''''''
                                GROUP BY cl.unique_id, cl.station_socket_id
                            ),
                            cte_status_changes as (
                                SELECT 
                                    cl.unique_id AS UNIQUE_ID,
                                    '''''' || current_date || '''''' AS STATUS_DATE,
                                    cl.station_socket_id AS SOCKET_ID,
                                    cl.station_id AS STATION_ID,
                                    CASE WHEN cl.minor_status_changed = 1 
                                        AND cl.old_station_socket_status_id = sbs.block_status_id
                                        AND cl.old_station_offline = 0
                                        AND ct.station_socket_id > 0 THEN ''''AVAILABLE''''
                                        WHEN cl.old_station_offline = 1 OR cl.old_station_socket_status_id = 5 THEN ''''UNKNOWN''''
                                        ELSE dss.connector_status_value 
                                    END AS STATUS_VALUE,
                                    CASE WHEN cl.old_disabled = 1 THEN 1 ELSE 0 END AS IN_MAINTENANCE,
                                    DATEDIFF(MICROSECOND,
                                        CASE WHEN cl.previous_changed_on < '''''' || current_date || '''''' 
                                        THEN '''''' || current_date || '''''' ELSE cl.previous_changed_on END,
                                        cl.changed_on
                                    ) / 1000 / 1000 AS STATUS_TIME,
                                    DATEADD(MINUTE,-1,cl.previous_changed_on) AS OLD_STATION_STATUS_CHANGE_ON_MIN_BACK,
                                    DATEADD(MINUTE, 1,cl.previous_changed_on) AS OLD_STATION_STATUS_CHANGE_ON_MIN_FORWARD,
                                    cl.changed_on   AS CHANGED_ON,
                                    s.managed       AS MANAGED,
                                    cl.client_name, cl.client_region, cl.filename, cl.row_number_in_file,
                                    cl.row_insert_date, cl.row_update_date, cl.row_from_bucket_name, cl.row_update_by_user,
                                    ''''1'''' AS UNION_ORDER
                                FROM cte_station_status_change_log_source_prev_changed_on cl
                                INNER JOIN cte_filtered_sockets s
                                    ON s.connector_id = cl.station_socket_id AND s.unique_id = cl.unique_id
                                LEFT JOIN cte_socket_block_status sbs ON sbs.unique_id = cl.unique_id
                                INNER JOIN cte_dic_socket_status_source dss 
                                    ON dss.connector_status_id = cl.old_station_socket_status_id AND dss.unique_id = cl.unique_id
                                LEFT JOIN (
                                    SELECT ss.unique_id AS UNIQUE_ID, ss1.connector_id AS STATION_SOCKET_ID,
                                        sct.start_on::DATE AS START_ON_DATE, ss1.charger_id AS CHARGER_ID,
                                        ss.connector_id AS STATION_SOCKET_ID_ORG, sct.start_on AS START_ON
                                    FROM cte_filtered_sockets ss
                                    FULL OUTER JOIN cte_filtered_sockets ss1
                                        ON ss.unique_id = ss1.unique_id AND ss.connector_id <> ss1.connector_id
                                        AND ss.charger_id = ss1.charger_id AND ss.evse_identity_key = ss1.evse_identity_key                     INNER JOIN cte_station_charge_transaction_source sct 
                                        ON sct.station_socket_id = ss1.connector_id AND sct.unique_id = ss1.unique_id
                                    WHERE sct.start_on::DATE = '''''' || current_date || ''''''
                                ) ct
                                    ON ct.station_socket_id_org = cl.station_socket_id AND ct.station_socket_id <> cl.station_socket_id
                                    AND ct.unique_id = cl.unique_id
                                    AND cl.station_socket_status_id <> cl.old_station_socket_status_id
                                    AND ct.start_on BETWEEN dateadd(MINUTE,-1,cl.previous_changed_on) 
                                    AND dateadd(MINUTE, 1,cl.previous_changed_on)
                               
                            ),
                            cte_status_value as (
                                SELECT sc.unique_id, sc.status_date, sc.socket_id, 
                                    MAX(sc.status_value) as status_value
                                FROM cte_status_changes sc
                                WHERE sc.union_order = ''''2''''
                                GROUP BY sc.unique_id, sc.status_date, sc.socket_id
                            ),
                            cte_group_measures AS (
                                SELECT up.unique_id AS UNIQUE_ID, up.status_date AS STATUS_DATE,
                                    up.socket_id AS SOCKET_ID, up.station_id AS STATION_ID,
                                    up.status_value AS STATUS_VALUE, SUM(up.status_time) AS STATUS_TIME,
                                    MAX(up.in_maintenance) AS IN_MAINTENANCE,
                                    MAX(cte_status_value.status_value) AS MAX_STATUS_VALUE,
                                    MAX(up.managed) AS MANAGED,
                                    MAX(up.client_name) AS CLIENT_NAME, MAX(up.client_region) AS CLIENT_REGION,
                                    MAX(up.filename) AS FILENAME, MAX(up.row_number_in_file) AS ROW_NUMBER_IN_FILE,
                                    MAX(up.row_insert_date) AS ROW_INSERT_DATE, MAX(up.row_update_date) AS ROW_UPDATE_DATE,
                                    MAX(up.row_from_bucket_name) AS ROW_FROM_BUCKET_NAME, MAX(up.row_update_by_user) AS ROW_UPDATE_BY_USER
                                FROM cte_status_changes up
                                LEFT JOIN cte_status_value
                                    ON up.unique_id = cte_status_value.unique_id AND up.status_date = cte_status_value.status_date
                                    AND up.socket_id = cte_status_value.socket_id
                                GROUP BY up.unique_id, up.status_date, up.socket_id, up.station_id, up.status_value
                            ),
                            cte_daily_status_times as (
                                SELECT 
                                    gm.unique_id AS UNIQUE_ID, gm.status_date AS STATUS_DATE,
                                    gm.socket_id AS SOCKET_ID, MAX(gm.station_id) AS STATION_ID,
                                    SUM(CASE WHEN gm.status_value = ''''AVAILABLE'''' THEN gm.status_time ELSE 0 END) AS AVAILABLE,
                                    SUM(CASE WHEN gm.status_value = ''''CHARGING'''' THEN gm.status_time ELSE 0 END) AS CHARGING,
                                    SUM(CASE WHEN gm.status_value = ''''FAULTED'''' THEN gm.status_time ELSE 0 END) AS FAULTED,
                                    SUM(CASE WHEN gm.status_value = ''''UNAVAILABLE'''' THEN gm.status_time ELSE 0 END) AS UNAVAILABLE,
                                    SUM(CASE WHEN gm.status_value = ''''NEW'''' THEN gm.status_time ELSE 0 END) AS NEW,
                                    SUM(CASE WHEN gm.status_value = ''''UNKNOWN'''' THEN gm.status_time ELSE 0 END) AS UNKNOWN,
                                    SUM(CASE WHEN gm.status_value = ''''RESERVED'''' THEN gm.status_time ELSE 0 END) AS RESERVED,
                                    SUM(CASE WHEN gm.status_value = ''''DISCHARGING'''' THEN gm.status_time ELSE 0 END) AS DISCHARGING,
                                    SUM(CASE WHEN gm.status_value = ''''PAUSED'''' THEN gm.status_time ELSE 0 END) AS PAUSED,
                                    SUM(CASE WHEN gm.status_value = ''''PREPARING'''' THEN gm.status_time ELSE 0 END) AS PREPARING,
                                    SUM(CASE WHEN gm.status_value = ''''FINISHING'''' THEN gm.status_time ELSE 0 END) AS FINISHING,
                                    MAX(gm.in_maintenance)::BOOLEAN AS IN_MAINTENANCE,
                                    MAX(gm.max_status_value) AS MAX_STATUS_VALUE,
                                    MAX(gm.managed) AS MANAGED,
                                    MAX(gm.client_name) AS CLIENT_NAME, MAX(gm.client_region) AS CLIENT_REGION,
                                    MAX(gm.filename) AS FILENAME, MAX(gm.row_number_in_file) AS ROW_NUMBER_IN_FILE,
                                    MAX(gm.row_insert_date) AS ROW_INSERT_DATE, MAX(gm.row_update_date) AS ROW_UPDATE_DATE,
                                    MAX(gm.row_from_bucket_name) AS ROW_FROM_BUCKET_NAME, MAX(gm.row_update_by_user) AS ROW_UPDATE_BY_USER
                                FROM cte_group_measures gm
                                GROUP BY gm.unique_id, gm.status_date, gm.socket_id
                                
                                UNION 
                                SELECT 
                                    ss.unique_id AS UNIQUE_ID, ss.status_date AS STATUS_DATE,
                                    ss.socket_id AS SOCKET_ID, ss.station_id AS STATION_ID,
                                    ss.available AS AVAILABLE, ss.charging AS CHARGING, ss.faulted AS FAULTED,
                                    ss.unavailable AS UNAVAILABLE, ss.new AS NEW, ss.unknown AS UNKNOWN,
                                    ss.reserved AS RESERVED, ss.discharging AS DISCHARGING, ss.paused AS PAUSED,
                                    ss.preparing AS PREPARING, ss.finishing AS FINISHING,
                                    ss.in_maintenance AS IN_MAINTENANCE, ss.max_status_value AS MAX_STATUS_VALUE,
                                    ss.managed AS MANAGED,
                                    ss.client_name AS CLIENT_NAME, ss.client_region AS CLIENT_REGION,
                                    ss.filename AS FILENAME, ss.row_number_in_file AS ROW_NUMBER_IN_FILE,
                                    ss.row_insert_date AS ROW_INSERT_DATE, ss.row_update_date AS ROW_UPDATE_DATE,
                                    ss.row_from_bucket_name AS ROW_FROM_BUCKET_NAME, ss.row_update_by_user AS ROW_UPDATE_BY_USER
                                FROM cte_station_sockets ss
                                WHERE NOT EXISTS (
                                    SELECT 1
                                    FROM cte_group_measures gm
                                    WHERE gm.unique_id = ss.unique_id AND gm.status_date = ss.status_date AND gm.socket_id = ss.socket_id
                                )
                            )
                            SELECT     
                                MD5(unique_id || ''''_'''' || status_date || ''''_'''' || socket_id) as charger_gk,
                                unique_id AS UNIQUE_ID,
                                status_date::DATE AS STATUS_DATE,
                                socket_id::NUMBER AS SOCKET_ID,
                                station_id::NUMBER AS STATION_ID,
                                available::NUMBER(38,8) AS AVAILABLE,
                                charging::NUMBER(38,8) AS CHARGING,
                                faulted::NUMBER(38,8) AS FAULTED,
                                unavailable::NUMBER(38,8) AS UNAVAILABLE,
                                new::NUMBER(38,8) AS NEW,
                                unknown::NUMBER(38,8) AS UNKNOWN,
                                reserved::NUMBER(38,8) AS RESERVED,
                                discharging::NUMBER(38,8) AS DISCHARGING,
                                paused::NUMBER(38,8) AS PAUSED,
                                preparing::NUMBER(38,8) AS PREPARING,
                                finishing::NUMBER(38,8) AS FINISHING,
                                in_maintenance::BOOLEAN AS IN_MAINTENANCE,
                                max_status_value::VARCHAR AS MAX_STATUS_VALUE,
                                managed AS MANAGED,
                                client_name AS CLIENT_NAME,
                                client_region AS CLIENT_REGION,
                                filename AS FILENAME,
                                row_number_in_file AS ROW_NUMBER_IN_FILE,
                                row_insert_date AS ROW_INSERT_DATE,
                                CURRENT_TIMESTAMP AS ROW_UPDATE_DATE,
                                row_from_bucket_name AS ROW_FROM_BUCKET_NAME,
                                ''''SP_CHARGER_STATUS'''' AS ROW_UPDATE_BY_USER
                            FROM cte_daily_status_times final_results
                            WHERE NOT EXISTS (
                                /* Prevent duplicate records */
                                SELECT 1 FROM '' || stg_schema || ''.CHARGER_STATUS_STG existing
                                WHERE existing.unique_id = final_results.unique_id
                                  AND existing.status_date = final_results.status_date
                                  AND existing.socket_id = final_results.socket_id
                            )'';