We (and when I say we I most mean @edgarrmondragon ) made a yaml driven macro to help with the creation of integrations, stages, and pipes with Snowpipe and S3. It’s an easy way to standardize how these objects are created and allows us take advantage of storing their profiles in version control. Thought it might be useful for other folks and would also love to hear feedback on how this could be improved or alternate methods of doing the same work.
Note, this macro will not work within dbt cloud. Understandably they do not allow you to run shell commands when executing dbt jobs. Also this post assumes you are somewhat familiar with Snowpipe and AWS and that you already have the roles and access established. There are a lot of resources about getting started with Snowpipe. I found this one to be most helpful Auto-ingest Snowpipe on S3. A create, debug and architect example… | by Cho Walton | Medium
Here’s How it works:
Place these macros in your /macros
directory
Create the Integration
{%- macro create_s3_integration(integration_name, role_arn, locations) -%}
{%- set sql -%}
begin;
-- Create stage
create or replace storage integration {{ integration_name }}
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = '{{ role_arn }}'
storage_allowed_locations = (
{%- for loc in locations %}
'{{ loc }}'{% if not loop.last %},{% endif %}
{%- endfor %}
);
commit;
{%- endset -%}
{%- do log(sql, info=True) -%}
{%- do run_query(sql) -%}
{%- do log("S3 integration created", info=True) -%}
{%- set results = run_query("desc integration " + integration_name) -%}
{%- do results.print_table(max_column_width=200) -%}
{%- endmacro -%}
Create the stage, table, and pipe
{%- macro
create_pipe(
pipe_name,
schema_name,
table_name,
stage_name,
integration_name,
s3_url,
columns,
file_type
)
-%}
{%- set sql -%}
begin;
-- Create schema
create schema if not exists {{ schema_name }};
-- Create stage
create or replace stage {{ schema_name }}.{{ stage_name }}
storage_integration = {{ integration_name }}
url = "{{ s3_url }}";
-- Create table
create or replace table {{ schema_name }}.{{ table_name }} (
{%- for col, type in columns.items() %}
{{ col }} {{ type }}{% if not loop.last %},{% endif %}
{%- endfor %}
);
-- Load historic data
copy into {{ schema_name }}.{{ table_name }}
from @{{ schema_name }}.{{ stage_name }}
on_error = continue
file_format = (
type = {{ file_type }}
{%- if file_type == 'CSV' %}
skip_header = 1
field_optionally_enclosed_by = '"'
{% endif -%}
null_if = ('')
);
-- Create pipe
create or replace pipe {{ schema_name }}.{{ pipe_name }} auto_ingest = true as
copy into {{ schema_name }}.{{ table_name }}
from @{{ schema_name }}.{{ stage_name }}
on_error = continue
file_format = (
file_format = (
type = {{ file_type }}
{%- if file_type == 'CSV' %}
skip_header = 1
field_optionally_enclosed_by = '"'
{% endif -%}
null_if = ('')
);
commit;
{%- endset -%}
{%- do log(sql, info=True) -%}
{%- do run_query(sql) -%}
{%- do log("Auto-ingest pipe created", info=True) -%}
{%- endmacro -%}
Create a yaml file to store the values for your integration. Ours live in a directory simply called /pipes
.
role_arn: 'put your aws arn here'
locations:
- 'put your locations here'
From /pipes
you can run the following command to create the integration and pass back the generated STORAGE_AWS_IAM_USER_ARN
and STORAGE_AWS_EXTERNAL_ID
to the json block of your AWS trust relationship.
dbt run-operation create_s3_integration --args "$(cat [integration yaml file])"
Here comes the super duper fun part where you create a yaml file to create the stage and the pipe. Again ours live in /pipes
. Below is a sample.
integration_name: revenue
schema_name: raw_sendgrid
table_name: event_webhook
pipe_name: sendgrid_pipe
stage_name: sendgrid_stage
s3_url: 'put your file location here'
columns:
sendgrid_json: variant
file_type: 'JSON'
Now all you have to do is run this command while in /pipes
. This command also creates the table and loads historic data to Snowflake.
dbt run-operation create_pipe --args "$(cat [pipe yaml file])"
The last step to automating the load of new data is to load the SQS notification channel in AWS. You can get that from Snowflake by running desc pipe [pipe name]