Automating Snowpipe for Amazon S3

This topic provides instructions for triggering Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket.

Note

This feature is limited to Snowflake accounts that are hosted on Amazon Web Services.

In this Topic:

Network Traffic

Note to Virtual Private Snowflake (VPS) and AWS PrivateLink customers:

Automating Snowpipe using Amazon SQS notifications works well. However, although AWS cloud storage within a VPC (including VPS) can communicate with its own messaging services (Amazon SQS, Amazon Simple Notification Service), this traffic flows between servers on Amazon’s secure network outside of the VPC; therefore, this traffic is not protected by the VPC.

Configuring Secure Access to Cloud Storage

Note

If you have already configured secure access to the S3 bucket that stores your data files, you can skip this section.

This section describes how to configure a Snowflake storage integration object to delegate authentication responsibility for cloud storage to a Snowflake identity and access management (IAM) entity.

Note

We highly recommend this option, which avoids the need to supply IAM credentials when accessing cloud storage. See Configuring Secure Access to Amazon S3 for additional storage access options.

This section describes how to use storage integrations to allow Snowflake to read data from and write data to an Amazon S3 bucket referenced in an external (i.e. S3) stage. Integrations are named, first-class Snowflake objects that avoid the need for passing explicit cloud provider credentials such as secret keys or access tokens. Integration objects store an AWS identity and access management (IAM) user ID. An administrator in your organization grants the integration IAM user permissions in the AWS account.

An integration can also list buckets (and optional paths) that limit the locations users can specify when creating external stages that use the integration.

Note

Completing the instructions in this section requires permissions in AWS to create and manage IAM policies and roles. If you are not an AWS administrator, ask your AWS administrator to perform these tasks.

The following diagram shows the integration flow for a S3 stage:

Amazon S3 Stage Integration Flow
  1. An external (i.e. S3) stage references a storage integration object in its definition.

  2. Snowflake automatically associates the storage integration with a S3 IAM user created for your account. Snowflake creates a single IAM user that is referenced by all S3 storage integrations in your Snowflake account.

  3. An AWS administrator in your organization grants permissions to the IAM user to access the bucket referenced in the stage definition. Note that many external stage objects can reference different buckets and paths and use the same storage integration for authentication.

When a user loads or unloads data from or to a stage, Snowflake verifies the permissions granted to the IAM user on the bucket before allowing or denying access.

In this Section:

Step 1: Configure Access Permissions for the S3 Bucket

AWS Access Control Requirements

Snowflake requires the following permissions on an S3 bucket and folder to be able to access files in the folder (and sub-folders):

  • s3:GetObject

  • s3:GetObjectVersion

  • s3:ListBucket

As a best practice, Snowflake recommends creating an IAM policy for Snowflake access to the S3 bucket. You can then attach the policy to the role and use the security credentials generated by AWS for the role to access files in the bucket.

Creating an IAM Policy

The following step-by-step instructions describe how to configure access permissions for Snowflake in your AWS Management Console so that you can use an S3 bucket to load and unload data:

  1. Log into the AWS Management Console.

  2. From the home dashboard, choose Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Choose Account settings from the left-hand navigation pane.

  4. Expand the Security Token Service Regions list, find the AWS region corresponding to the region where your account is located, and choose Activate if the status is Inactive.

  5. Choose Policies from the left-hand navigation pane.

  6. Click Create Policy:

    Create Policy button on Policies page
  7. Click the JSON tab.

  8. Add a policy document that will allow Snowflake to access the S3 bucket and folder.

    The following policy (in JSON format) provides Snowflake with the required permissions to load or unload data using a single bucket and folder path.

    Copy and paste the text into the policy editor:

    Note

    Make sure to replace bucket and prefix with your actual bucket name and folder path prefix.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion",
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": "s3:ListBucket",
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    

    Important

    Setting the "s3:prefix": condition to ["*"] grants access to all prefixes in the specified bucket. If more than 1000 objects exist in the bucket, you could encounter the following error: Access Denied (Status Code: 403; Error Code: AccessDenied).

    To avoid the error, remove the condition from the IAM policy, e.g.:

    "Condition": {
          "StringLike": {
              "s3:prefix": [
                  "*"
              ]
          }
      }
    

    The policy still grants access to the files in the bucket, but S3 does not return an error if more than 1000 objects exist in the bucket.

    Note that AWS policies support a variety of different security use cases.

    The following policy provides Snowflake with the required permissions to load data from a single read-only bucket and folder path. The policy includes the s3:GetObject, s3:GetObjectVersion, and s3:ListBucket permissions:

    Alternative policy: Load from a read-only S3 bucket

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion"
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": "s3:ListBucket",
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    
  9. Click Review policy.

  10. Enter the policy name (e.g. snowflake_access) and an optional description. Click Create policy.

    Create Policy button in Review Policy page

Step 2: Create the IAM Role in AWS

In the AWS Management Console, create an AWS IAM role to grant privileges on the S3 bucket containing your data files.

  1. Log into the AWS Management Console.

  2. From the home dashboard, choose Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Choose Roles from the left-hand navigation pane.

  4. Click the Create role button.

    Select Trusted Entity Page in AWS Management Console
  5. Select Another AWS account as the trusted entity type.

  6. In the Account ID field, enter your own AWS account ID temporarily. Later, you will modify the trusted relationship and grant access to Snowflake.

  7. Select the Require external ID option. Enter a dummy ID such as 0000. Later, you will modify the trusted relationship and specify the external ID for your Snowflake stage. An external ID is required to grant access to your AWS resources (i.e. S3) to a third party (i.e. Snowflake).

  8. Click the Next button.

  9. Locate the policy you created in Step 1: Configure Access Permissions for the S3 Bucket (in this topic), and select this policy.

  10. Click the Next button.

    Review Page in AWS Management Console
  11. Enter a name and description for the role, and click the Create role button.

    You have now created an IAM policy for a bucket, created an IAM role, and attached the policy to the role.

  12. Record the Role ARN value located on the role summary page. In the next step, you will create a Snowflake integration that references this role.

    IAM Role

Step 3: Create a Cloud Storage Integration in Snowflake

Create a storage integration using the CREATE STORAGE INTEGRATION command. A storage integration is a Snowflake object that stores a generated identity and access management (IAM) user for your S3 cloud storage, along with an optional set of allowed or blocked storage locations (i.e. buckets). Cloud provider administrators in your organization grant permissions on the storage locations to the generated user. This option allows users to avoid supplying credentials when creating stages or loading data.

A single storage integration can support multiple external (i.e. S3) stages. The URL in the stage definition must align with the S3 buckets (and optional paths) specified for the STORAGE_ALLOWED_LOCATIONS parameter.

Note

Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Where:

  • integration_name is the name of the new integration.

  • iam_role is the Amazon Resource Name (ARN) of the role you created in Step 2: Create the IAM Role in AWS (in this topic).

  • bucket is the name of a S3 bucket that stores your data files (e.g. mybucket). The required STORAGE_ALLOWED_LOCATIONS parameter and optional STORAGE_BLOCKED_LOCATIONS parameter restrict or block access to these buckets, respectively, when stages that reference this integration are created or modified.

  • path is an optional path that can be used to provide granular control over objects in the bucket.

The following example creates an integration that explicitly limits external stages that use the integration to reference either of two buckets and paths. In a later step, we will create an external stage that references one of these buckets and paths.

Additional external stages that also use this integration can reference the allowed buckets and paths:

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');

Step 4: Retrieve the AWS IAM User for your Snowflake Account

  1. Execute the DESCRIBE INTEGRATION command to retrieve the ARN for the AWS IAM user that was created automatically for your Snowflake account:

    DESC INTEGRATION <integration_name>;
    

    Where:

    For example:

    DESC INTEGRATION s3_int;
    
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    | property                  | property_type | property_value                                                                 | property_default |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------|
    | ENABLED                   | Boolean       | true                                                                           | false            |
    | STORAGE_ALLOWED_LOCATIONS | List          | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/                                | []               |
    | STORAGE_BLOCKED_LOCATIONS | List          | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/    | []               |
    | STORAGE_AWS_IAM_USER_ARN  | String        | arn:aws:iam::123456789001:user/abc1-b-self1234                                 |                  |
    | STORAGE_AWS_ROLE_ARN      | String        | arn:aws:iam::001234567890:role/myrole                                          |                  |
    | STORAGE_AWS_EXTERNAL_ID   | String        | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=                               |                  |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    
  2. Record the following values:

    Value

    Description

    STORAGE_AWS_IAM_USER_ARN

    The AWS IAM user created for your Snowflake account, arn:aws:iam::123456789001:user/abc1-b-self1234 in this example. We provision a single IAM user for your entire Snowflake account. All S3 storage integrations use that IAM user.

    STORAGE_AWS_EXTERNAL_ID

    The external ID that is needed to establish a trust relationship.

    You will provide these values in the next section.

Step 5: Grant the IAM User Permissions to Access Bucket Objects

The following step-by-step instructions describe how to configure IAM access permissions for Snowflake in your AWS Management Console so that you can use a S3 bucket to load and unload data:

  1. Log into the AWS Management Console.

  2. From the home dashboard, choose Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Choose Roles from the left-hand navigation pane.

  4. Click on the role you created in Step 2: Create the IAM Role in AWS (in this topic).

  5. Click on the Trust relationships tab.

  6. Click the Edit trust relationship button.

  7. Modify the policy document with the DESC STORAGE INTEGRATION output values you recorded in Step 4: Retrieve the AWS IAM User for your Snowflake Account (in this topic):

    Policy document for IAM role

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<snowflake_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<snowflake_external_id>"
            }
          }
        }
      ]
    }
    

    Where:

    • snowflake_external_id is the STORAGE_AWS_EXTERNAL_ID value you recorded.

    • snowflake_user_arn is the STORAGE_AWS_IAM_USER_ARN value you recorded.

  8. Click the Update Trust Policy button. The changes are saved.

Determining the Correct Option

Before proceeding, determine whether an S3 event notification exists for the target path (or “prefix,” in AWS terminology) in your S3 bucket where your data files are located. AWS rules prohibit creating conflicting notifications for the same path.

The following options for automating Snowpipe using Amazon SQS are supported:

  • Option 1. New S3 event notification: Create an event notification for the target path in your S3 bucket. The event notification informs Snowpipe via an SQS queue when files are ready to load.

    This is the most common option.

    Important

    If a conflicting event notification exists for your S3 bucket, use Option 2 instead.

  • Option 2. Existing event notification: Configure Amazon Simple Notification Service (SNS) as a broadcaster to share notifications for a given path with multiple endpoints (or “subscribers,” e.g. SQS queues or AWS Lambda workloads), including the Snowflake SQS queue for Snowpipe automation. An S3 event notification published by SNS informs Snowpipe via an SQS queue when files are ready to load.

Option 1: Creating a New S3 Event Notification to Automate Snowpipe

This section describes the most common option for triggering Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket. The steps explain how to create an event notification for the target path (or “prefix,” in AWS terminology) in your S3 bucket where your data files are stored.

Important

If a conflicting event notification exists for your S3 bucket, use Option 2: Configuring Amazon SNS to Automate Snowpipe Using SQS Notifications (in this topic) instead. AWS rules prohibit creating conflicting notifications for the same target path.

The following diagram shows the Snowpipe auto-ingest process flow:

Snowpipe Auto-ingest Process Flow
  1. Data files are loaded in a stage.

  2. An S3 event notification informs Snowpipe via an SQS queue that files are ready to load. Snowpipe copies the files into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.

Note

The instructions in this topic assume a target table already exists in the Snowflake database where your data will be loaded.

Step 1: Create a Stage (If Needed)

Create an external stage that references your S3 bucket using the CREATE STAGE command. Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table. Alternatively, you can use an existing external stage.

Note

To configure secure access to the cloud storage location, see Configuring Secure Access to Cloud Storage (in this topic).

The following example creates a stage named mystage in the active schema for the user session. The cloud storage URL includes the path files. The stage references a storage integration named myint:

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = myint;

Step 2: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

The following example creates a pipe named mypipe in the active schema for the user session. The pipe loads the data from files staged in the mystage stage into the mytable table:

create pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

The AUTO_INGEST=true parameter specifies to read event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.

Important

Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same S3 bucket do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same S3 bucket with different levels of granularity, such as s3://mybucket/path1 and s3://mybucket/path1/path2. In this use case, if files are staged in s3://mybucket/path1/path2, the pipes for both stages would load a copy of the files.

This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the S3 event notifications. Additional care is required to avoid data duplication.

Note

Cloning a database or schema clones all objects, including pipes that reference external stages, in the source database or schema. When a data file is created in a stage location (e.g. S3 bucket), a copy of the notification is sent to to every pipe that matches the stage location. This results in the following behavior:

  • If a table is fully qualified in the COPY statement in the pipe definition (in the form of db_name.schema_name.table_name or schema_name.table_name), then Snowpipe loads duplicate data into the source table (i.e. the database.schema.table in the COPY statement) for each pipe.

  • If a table is not fully qualified in the pipe definition, then Snowpipe loads the data into the same table (e.g. mytable) in the source and cloned databases/schemas.

Step 3: Configure Security

For each user who will execute continuous data loads using Snowpipe, grant sufficient access control privileges on the objects for the data load (i.e. the target database, schema, and table; the stage object, and the pipe).

Note

To follow the general principle of “least privilege”, we recommend creating a separate user and role to use for ingesting files using a pipe. The user should be created with this role as its default role.

Using Snowpipe requires a role with the following privileges:

Object

Privilege

Notes

Named pipe

OWNERSHIP

Named stage

USAGE , READ

Named file format

USAGE

Optional; only needed if the stage you created in Step 1: Create a Stage (If Needed) references a named file format.

Target database

USAGE

Target schema

USAGE

Target table

INSERT , SELECT

Use the GRANT <privileges> … TO ROLE command to grant privileges to the role.

Note

Only security administrators (i.e. users with the SECURITYADMIN role) or higher, or another role with both the CREATE ROLE privilege on the account and the global MANAGE GRANTS privilege, can create roles and grant privileges.

For example, create a role named snowpipe1 that can access a set of snowpipe_db.public database objects as well as a pipe named mypipe; then, grant the role to a user:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

Step 4: Configure Event Notifications

Configure event notifications for your S3 bucket to notify Snowpipe when new data is available to load. The auto-ingest feature relies on SQS queues to deliver event notifications from S3 to Snowpipe.

For ease of use, Snowpipe SQS queues are created and managed by Snowflake. The SHOW PIPES command output displays the Amazon Resource Name (ARN) of your SQS queue.

  1. Execute the SHOW PIPES command:

    SHOW PIPES;
    

    Note the ARN of the SQS queue for the stage in the notification_channel column. Copy the ARN to a convenient location.

    Note

    Following AWS guidelines, Snowflake designates no more than one SQS queue per S3 bucket. This SQS queue may be shared among multiple buckets in the same AWS account. The SQS queue coordinates notifications for all pipes connecting the external stages for the S3 bucket to the target tables. When a data file is uploaded into the bucket, all pipes that match the stage directory path perform a one-time load of the file into their corresponding target tables.

  2. Log into the Amazon S3 console.

  3. Configure an event notification for your S3 bucket using the instructions provided in the Amazon S3 documentation. Complete the fields as follows:

    • Name: Name of the event notification (e.g. Auto-ingest Snowflake).

    • Events: Select the ObjectCreate (All) option.

    • Send to: Select SQS Queue from the dropdown list.

    • SQS: Select Add SQS queue ARN from the dropdown list.

    • SQS queue ARN: Paste the SQS queue name from the SHOW PIPES output.

Note

These instructions create a single event notification that monitors activity for the entire S3 bucket. This is the simplest approach. This notification handles all pipes configured at a more granular level in the S3 bucket directory. Snowpipe only loads data files as specified in pipe definitions. Note, however, that a high volume of notifications for activity outside a pipe definition could negatively impact the rate at which Snowpipe filters notifications and takes action.

Alternatively, in the above steps, configure one or more paths and/or file extensions (or prefixes and suffixes, in AWS terminology) to filter event activity. For instructions, see the object key name filtering information in the relevant AWS documentation topic. Repeat these steps for each additional path or file extension you want the notification to monitor.

Note that AWS limits the number of these notification queue configurations to a maximum of 100 per S3 bucket.

Also note that AWS does not allow overlapping queue configurations (across event notifications) for the same S3 bucket. For example, if an existing notification is configured for s3://mybucket/load/path1, then you cannot create another notification at a higher level, such as s3://mybucket/load, or vice-versa.

Snowpipe with auto-ingest is now configured!

When new data files are added to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

Step 5: Load Historical Files

To load any backlog of data files that existed in the external stage before SQS notifications were configured, see Loading Historic Data.

Option 2: Configuring Amazon SNS to Automate Snowpipe Using SQS Notifications

This section describes how to trigger Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket. The steps explain how to configure Amazon Simple Notification Service (SNS) as a broadcaster to publish event notifications for your S3 bucket to multiple subscribers (e.g. SQS queues or AWS Lambda workloads), including the Snowflake SQS queue for Snowpipe automation.

Note

These instructions assume an event notification exists for the target path in your S3 bucket where your data files are located. If no event notification exists, either:

The following diagram shows the process flow for Snowpipe auto-ingest with Amazon SNS:

Snowpipe Auto-ingest Process Flow with Amazon SNS
  1. Data files are loaded in a stage.

  2. An S3 event notification published by SNS informs Snowpipe via an SQS queue that files are ready to load. Snowpipe copies the files into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.

Note

The instructions assume a target table already exists in the Snowflake database where your data will be loaded.

Prerequisite: Create an Amazon SNS Topic and Subscription

  1. Create an SNS topic in your AWS account to handle all messages for the Snowflake stage location on your S3 bucket.

  2. Subscribe your target destinations for the S3 event notifications (e.g. other SQS queues or AWS Lambda workloads) to this topic. SNS publishes event notifications for your bucket to all subscribers to the topic.

For instructions, see the SNS documentation.

Step 1: Subscribe the Snowflake SQS Queue to the SNS Topic

  1. Log into the AWS Management Console.

  2. From the home dashboard, choose Simple Notification Service (SNS).

  3. Choose Topics from the left-hand navigation pane.

  4. Locate the topic for your S3 bucket. Note the topic ARN.

  5. Using a Snowflake client, query the SYSTEM$GET_AWS_SNS_IAM_POLICY system function with your SNS topic ARN:

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    

    The function returns an IAM policy that grants a Snowflake SQS queue permission to subscribe to the SNS topic.

    For example:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
  6. Return to the AWS Management Console. Choose Topics from the left-hand navigation pane.

  7. Select the checkbox beside the topic for your S3 bucket, and from the Actions menu, click Edit topic policy. Click the Advanced view tab to edit the JSON format of the policy.

  8. Merge the IAM policy addition from the SYSTEM$GET_AWS_SNS_IAM_POLICY function results into the JSON document.

    For example:

    Original IAM policy (abbreviated):

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         }
       ]
     }
    

    Merged IAM policy:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         }
       ]
     }
    
  9. Add an additional policy grant to allow S3 to publish event notifications for the bucket to the SNS topic.

    For example (using the SNS topic ARN and S3 bucket used throughout these instructions):

    {
        "Sid":"s3-event-notifier",
        "Effect":"Allow",
        "Principal":{
           "Service":"s3.amazonaws.com"
        },
        "Action":"SNS:Publish",
        "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
        "Condition":{
           "ArnLike":{
              "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
           }
        }
     }
    

    Merged IAM policy:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         },
         {
            "Sid":"s3-event-notifier",
            "Effect":"Allow",
            "Principal":{
               "Service":"s3.amazonaws.com"
            },
            "Action":"SNS:Publish",
            "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
            "Condition":{
               "ArnLike":{
                  "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
               }
            }
          }
       ]
     }
    
  10. Click the Update policy button.

Step 2: Create a Stage (If Needed)

Create an external stage that references your S3 bucket using the CREATE STAGE command. Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table.

Alternatively, you can use an existing external stage.

Note

To configure secure access to the cloud storage location, see Configuring Secure Access to Cloud Storage (in this topic).

The following example creates a stage named mystage in the active schema for the user session. The cloud storage URL includes the path files. The stage references a storage integration named myint:

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = myint;

Step 3: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table. In the COPY statement, identify the SNS topic ARN from Prerequisite: Create an Amazon SNS Topic and Subscription.

The following example creates a pipe named mypipe in the active schema for the user session. The pipe loads the data from files staged in the mystage stage into the mytable table:

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Where:

AUTO_INGEST = true

Specifies to read event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.

AWS_SNS_TOPIC = '<sns_topic_arn>'

Specifies the ARN for the SNS topic for your S3 bucket, e.g. arn:aws:sns:us-west-2:001234567890:s3_mybucket in the current example. The CREATE PIPE statement subscribes the Snowflake SQS queue to the specified SNS topic. Note that the pipe will only copy files to the ingest queue triggered by event notifications via the SNS topic.

To remove either parameter from a pipe, it is currently necessary to recreate the pipe using the CREATE OR REPLACE PIPE syntax.

Important

Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same S3 bucket do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same S3 bucket with different levels of granularity, such as s3://mybucket/path1 and s3://mybucket/path1/path2. In this use case, if files are staged in s3://mybucket/path1/path2, the pipes for both stages would load a copy of the files.

This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the S3 event notifications. Additional care is required to avoid data duplication.

Step 4: Configure Security

For each user who will execute continuous data loads using Snowpipe, grant sufficient access control privileges on the objects for the data load (i.e. the target database, schema, and table; the stage object, and the pipe).

Note

To follow the general principle of “least privilege”, we recommend creating a separate user and role to use for ingesting files using a pipe. The user should be created with this role as its default role.

Using Snowpipe requires a role with the following privileges:

Object

Privilege

Notes

Named pipe

OWNERSHIP

Named stage

USAGE , READ

Named file format

USAGE

Optional; only needed if the stage you created in Step 2: Create a Stage (If Needed) references a named file format.

Target database

USAGE

Target schema

USAGE

Target table

INSERT , SELECT

Use the GRANT <privileges> … TO ROLE command to grant privileges to the role.

Note

Only security administrators (i.e. users with the SECURITYADMIN role) or higher can create roles.

For example, create a role named snowpipe1 that can access a set of snowpipe_db.public database objects as well as a pipe named mypipe; then, grant the role to a user:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

Snowpipe with auto-ingest is now configured!

When new data files are added to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

Step 5: Load Historical Files

To load any backlog of data files that existed in the external stage before SQS notifications were configured, see Loading Historic Data.

SYSTEM$PIPE_STATUS Output

The SYSTEM$PIPE_STATUS function retrieves a JSON representation of the current status of a pipe.

For pipes with AUTO_INGEST set to TRUE, the function returns a JSON object containing the following name/value pairs (if applicable to the current pipe status):

{“executionState”:”<value>”,”oldestFileTimestamp”:<value>,”pendingFileCount”:<value>,”notificationChannelName”:”<value>”,”numOutstandingMessagesOnChannel”:<value>,”lastReceivedMessageTimestamp”:”<value>”,”lastForwardedMessageTimestamp”:”<value>”,”error”:<value>,”fault”:<value>}

Where:

executionState

Current execution state of the pipe; could be any one of the following:

  • RUNNING (i.e. everything is normal; Snowflake may or may not be actively processing files for this pipe)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Earliest timestamp among data files currently queued (if applicable), where the timestamp is set when the file is added to the queue.

pendingFileCount

Number of files currently being processed by the pipe. If the pipe is paused, this value will decrease as any files queued before the pipe was paused are processed. When this value is 0, either there are no files queued for this pipe or the pipe is effectively paused.

notificationChannelName

Amazon SQS queue associated with the pipe.

numOutstandingMessagesOnChannel

Number of messages in the SQS queue that have been queued but not received yet.

lastReceivedMessageTimestamp

Timestamp of the last message received from the SQS queue. Note that this message might not apply to the specific pipe, e.g., if the path/prefix associated with the message does not match the path/prefix in the pipe definition. In addition, only messages triggered by created data objects are consumed by auto-ingest pipes.

lastForwardedMessageTimestamp

Timestamp of the last “create object” event message with a matching path/prefix that was forwarded to the pipe.

error

Error message produced when the pipe was last compiled for execution (if applicable); often caused by problems accessing the necessary objects (i.e. table, stage, file format) due to permission problems or dropped objects.

fault

Most recent internal Snowflake process error (if applicable). Used primarily by Snowflake for debugging purposes.