Managing Snowpipe Integrations, Stages and Pipes

We (and when I say we I most mean @edgarrmondragon :dragon: ) made a yaml driven macro to help with the creation of integrations, stages, and pipes with Snowpipe and S3. It’s an easy way to standardize how these objects are created and allows us take advantage of storing their profiles in version control. Thought it might be useful for other folks and would also love to hear feedback on how this could be improved or alternate methods of doing the same work.

Note, this macro will not work within dbt cloud. Understandably they do not allow you to run shell commands when executing dbt jobs. Also this post assumes you are somewhat familiar with Snowpipe and AWS and that you already have the roles and access established. There are a lot of resources about getting started with Snowpipe. I found this one to be most helpful Auto-ingest Snowpipe on S3. A create, debug and architect example… | by Cho Walton | Medium


Here’s How it works:

Place these macros in your /macros directory
Create the Integration

 {%- macro create_s3_integration(integration_name, role_arn, locations) -%}
 {%- set sql -%}
 begin;

 -- Create stage
 create or replace storage integration {{ integration_name }}
   type = external_stage
   storage_provider = s3
   enabled = true
   storage_aws_role_arn = '{{ role_arn }}'
   storage_allowed_locations = (
     {%- for loc in locations %}
     '{{ loc }}'{% if not loop.last %},{% endif %}
     {%- endfor %}
   );

 commit;
 {%- endset -%}

 {%- do log(sql, info=True) -%}
 {%- do run_query(sql) -%}
 {%- do log("S3 integration created", info=True) -%}
 {%- set results = run_query("desc integration " + integration_name) -%}
 {%- do results.print_table(max_column_width=200) -%}
 {%- endmacro -%}

Create the stage, table, and pipe

 {%- macro

 create_pipe(
   pipe_name,
   schema_name,
   table_name,
   stage_name,
   integration_name,
   s3_url,
   columns,
   file_type
 )

 -%}
 {%- set sql -%}
 begin;

 -- Create schema
 create schema if not exists {{ schema_name }};

 -- Create stage
 create or replace stage {{ schema_name }}.{{ stage_name }}
   storage_integration = {{ integration_name }}
   url = "{{ s3_url }}";

 -- Create table
 create or replace table {{ schema_name }}.{{ table_name }} (
   {%- for col, type in columns.items() %}
   {{ col }} {{ type }}{% if not loop.last %},{% endif %}
   {%- endfor %}
 );

 -- Load historic data
 copy into {{ schema_name }}.{{ table_name }}
 from @{{ schema_name }}.{{ stage_name }}
 on_error = continue
 file_format = (
   type = {{ file_type }}
   {%- if file_type == 'CSV' %}
     skip_header = 1
     field_optionally_enclosed_by = '"'
   {% endif -%}
   null_if = ('')
 );


 -- Create pipe
 create or replace pipe {{ schema_name }}.{{ pipe_name }} auto_ingest = true as
   copy into {{ schema_name }}.{{ table_name }}
   from @{{ schema_name }}.{{ stage_name }}
   on_error = continue
   file_format = (
   file_format = (
     type = {{ file_type }}
     {%- if file_type == 'CSV' %}
       skip_header = 1
       field_optionally_enclosed_by = '"'
     {% endif -%}
     null_if = ('')
   );

 commit;
 {%- endset -%}

 {%- do log(sql, info=True) -%}
 {%- do run_query(sql) -%}
 {%- do log("Auto-ingest pipe created", info=True) -%}
 {%- endmacro -%}

Create a yaml file to store the values for your integration. Ours live in a directory simply called /pipes.

role_arn: 'put your aws arn here'
locations:
  - 'put your locations here'

From /pipes you can run the following command to create the integration and pass back the generated STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID to the json block of your AWS trust relationship.
dbt run-operation create_s3_integration --args "$(cat [integration yaml file])"

Here comes the super duper fun part :partying_face: where you create a yaml file to create the stage and the pipe. Again ours live in /pipes. Below is a sample.

integration_name: revenue
schema_name: raw_sendgrid
table_name: event_webhook
pipe_name: sendgrid_pipe
stage_name: sendgrid_stage
s3_url: 'put your file location here'
columns:
  sendgrid_json: variant
file_type: 'JSON'

Now all you have to do is run this command while in /pipes. This command also creates the table and loads historic data to Snowflake.
dbt run-operation create_pipe --args "$(cat [pipe yaml file])"

The last step to automating the load of new data is to load the SQS notification channel in AWS. You can get that from Snowflake by running desc pipe [pipe name]

5 Likes

Note that the YAML files used to define integrations and pipes should not be placed within the models directory. Otherwise, dbt will try to parse them as schema files and the command will fail.

Can you add an example of multiple environment DBT code for Snowpipe sourcing from Azure Blob Storage?