Post Hook with incremental tables

I’m creating an incremental model and I want to run a post hook in order to save the new records to a file in my cloud provider. The key concept is that I only want to save those records which were appended to the table in the current execution.

Is there a way to refer to those records which have been queried in that increment, or any other workaround for this?

Usage example:

{{-
    config(
        materialized = 'incremental',
        incremental_strategy = 'merge',
        unique_key = 'id',
        post_hook = """
EXPORT DATA
    OPTIONS (
        uri = 'gs://bucket/path/{{ run_started_at.strftime('%Y-%m-%d_%H-%M-%S') }}/*.csv',
        format = 'CSV',
        overwrite = true,
        header = true,
        field_delimiter = ';')
AS (
    SELECT * FROM {{ this }}
);
"""
    )
-}}

SELECT *
FROM {{ source('my_source', 'my_table') }}
WHERE
    {%- if is_incremental() %}
    date_hit > (SELECT MAX(date_hit) from {{ this }})
    {%- else %}
    date_hit >= "1970-01-01"
    {% endif %}

The SELECT * FROM {{ this }} clause in my post hook will save the whole table, so that would be what I need to change

Hi @luiscristobal.lopez

You could add the current date/timestamp inserting new records, and filter them out.

Can you elaborate on that @wefo ? I have tried using timestamps, but post hooks can’t access context at compilation/execution time, so even if I save the max timestamp used for the increment in a set, it’s not accessible when the post hook runs.

I think this means that you would do something like this: (the two changed lines are indicated)

{{-
    config(
        materialized = 'incremental',
        incremental_strategy = 'merge',
        unique_key = 'id',
        post_hook = """
EXPORT DATA
    OPTIONS (
        uri = 'gs://bucket/path/{{ run_started_at.strftime('%Y-%m-%d_%H-%M-%S') }}/*.csv',
        format = 'CSV',
        overwrite = true,
        header = true,
        field_delimiter = ';')
AS (
    /* this line is changed */
    SELECT * FROM {{ this }} where loaded_at = {{ run_started_at }}

);
"""
    )
-}}

SELECT *,
  {{ run_started_at }} as loaded_at /* this line is changed*/
FROM {{ source('my_source', 'my_table') }}
WHERE
    {%- if is_incremental() %}
    date_hit > (SELECT MAX(date_hit) from {{ this }})
    {%- else %}
    date_hit >= "1970-01-01"
    {% endif %}
2 Likes

thanks @joellabes. that was my point.

1 Like

I partitioned my table by this run_started_at field and the suggestion fit perfectly to my use case.

Thank you very much both for the help

1 Like

The only thing to keep in mind with this approach is that it assumes your post hook will run successfully every time. If the model builds successfully but the export step fails for some reason, then those records will never be exported without manual intervention because subsequent runs will obviously have a different run_started_at.

It’s unlikely to be a major issue but something worth keeping in mind

Sure! It’s not a problem since the export select statement should be simple enough to ensure it doesn’t fail. If eventually it happens, I will receive an error message and export the rows manually. Thanks for the disclaimer

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.