Automating Snowpipe for Google Cloud Storage

This topic provides instructions for triggering Snowpipe data loads automatically using Google Cloud Pub/Sub messages for Google Cloud Storage (GCS) events.

Note

This feature is limited to Snowflake accounts that are hosted on Amazon Web Services (AWS) or Google Cloud Platform. The instructions for automating Snowpipe data loads using GCS Pub/Sub messages are identical for accounts on either cloud hosting platform.

Support for automating Snowpipe data loads from GCS into Snowflake accounts hosted on AWS is provided as a preview feature.

In this Topic:

Configuring Secure Access to Cloud Storage

Note

If you have already configured secure access to the GCS bucket that stores your data files, you can skip this section.

This section describes how to configure a Snowflake storage integration object to delegate authentication responsibility for cloud storage to a Snowflake identity and access management (IAM) entity.

This section describes how to use storage integrations to allow Snowflake to read data from and write to a Google Cloud Storage bucket referenced in an external (i.e. Cloud Storage) stage. Integrations are named, first-class Snowflake objects that avoid the need for passing explicit cloud provider credentials such as secret keys or access tokens; instead, integration objects reference a Cloud Storage service account. An administrator in your organization grants the service account permissions in the Cloud Storage account.

Administrators can also restrict users to a specific set of Cloud Storage buckets (and optional paths) accessed by external stages that use the integration.

Note

Completing the instructions in this section requires access to your Cloud Storage project as a project editor. If you are not a project editor, ask your Cloud Storage administrator to perform these tasks.

The following diagram shows the integration flow for a Cloud Storage stage:

Google Cloud Storage Stage Integration Flow
  1. An external (i.e. Cloud Storage) stage references a storage integration object in its definition.

  2. Snowflake automatically associates the storage integration with a Cloud Storage service account created for your account. Snowflake creates a single service account that is referenced by all GCS storage integrations in your Snowflake account.

  3. A project editor for your Cloud Storage project grants permissions to the service account to access the bucket referenced in the stage definition. Note that many external stage objects can reference different buckets and paths and use the same integration for authentication.

When a user loads or unloads data from or to a stage, Snowflake verifies the permissions granted to the service account on the bucket before allowing or denying access.

In this Section:

Step 1: Create a Cloud Storage Integration in Snowflake

Create an integration using the CREATE STORAGE INTEGRATION command. An integration is a Snowflake object that delegates authentication responsibility for external cloud storage to a Snowflake-generated entity (i.e. a Cloud Storage service account). For accessing Cloud Storage buckets, Snowflake creates a service account that can be granted permissions to access the bucket(s) that store your data files.

A single storage integration can support multiple external (i.e. GCS) stages. The URL in the stage definition must align with the GCS buckets (and optional paths) specified for the STORAGE_ALLOWED_LOCATIONS parameter.

Note

Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]

Where:

  • integration_name is the name of the new integration.

  • bucket is the name of a Cloud Storage bucket that stores your data files (e.g. mybucket). The required STORAGE_ALLOWED_LOCATIONS parameter and optional STORAGE_BLOCKED_LOCATIONS parameter restrict or block access to these buckets, respectively, when stages that reference this integration are created or modified.

  • path is an optional path that can be used to provide granular control over objects in the bucket.

The following example creates an integration that explicitly limits external stages that use the integration to reference either of two buckets and paths. In a later step, we will create an external stage that references one of these buckets and paths.

Additional external stages that also use this integration can reference the allowed buckets and paths:

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');

Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account

Execute the DESCRIBE INTEGRATION command to retrieve the ID for the Cloud Storage service account that was created automatically for your Snowflake account:

DESC STORAGE INTEGRATION <integration_name>;

Where:

For example:

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+

The STORAGE_GCP_SERVICE_ACCOUNT property in the output shows the Cloud Storage service account created for your Snowflake account (e.g. service-account-id@project1-123456.iam.gserviceaccount.com). We provision a single Cloud Storage service account for your entire Snowflake account. All Cloud Storage integrations use that service account.

Step 3: Grant the Service Account Permissions to Access Bucket Objects

The following step-by-step instructions describe how to configure IAM access permissions for Snowflake in your Google Cloud Platform Console so that you can use a Cloud Storage bucket to load and unload data:

Creating a Custom IAM Role

Create a custom role that has the permissions required to access the bucket and get objects.

  1. Log into the Google Cloud Platform Console as a project editor.

  2. From the home dashboard, choose IAM & admin » Roles.

  3. Click Create Role.

  4. Enter a name, and description for the custom role.

  5. Click Add Permissions.

  6. Filter the list of permissions, and add the following from the list:

    Data loading only
    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    Data loading with purge option
    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Data loading and unloading
    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

  7. Click Create.

Assigning the Custom Role to the Cloud Storage Service Account

  1. Log into the Google Cloud Platform Console as a project editor.

  2. From the home dashboard, choose Storage » Browser:

    Bucket List in Google Cloud Platform Console
  3. Select a bucket to configure for access.

  4. Click SHOW INFO PANEL in the upper-right corner. The information panel for the bucket slides out.

  5. In the Add members field, search for the service account name from the DESCRIBE INTEGRATION output in Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account (in this topic).

    Bucket Information Panel in Google Cloud Platform Console
  6. From the Select a role dropdown, select Storage » Custom » <role>, where <role> is the custom Cloud Storage role you created in Creating a Custom IAM Role (in this topic).

  7. Click the Add button. The service account name is added to the Storage Object Viewer role dropdown in the information panel.

    Storage Object Viewer role list in Google Cloud Platform Console

Granting the Cloud Storage Service Account Permissions on the Cloud Key Management Service Cryptographic Keys

Note

This step is required only if your GCS bucket is encrypted using a key stored in the Google Cloud Key Management Service (Cloud KMS).

  1. Log into the Google Cloud Platform Console as a project editor.

  2. From the home dashboard, choose Security » Cryptographic keys.

  3. Select the key ring that is assigned to your GCS bucket.

  4. Click SHOW INFO PANEL in the upper-right corner. The information panel for the key ring slides out.

  5. In the Add members field, search for the service account name from the DESCRIBE INTEGRATION output in Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account (in this topic).

  6. From the Select a role dropdown, select the Cloud KMS CrytoKey Encryptor/Decryptor role.

  7. Click the Add button. The service account name is added to the Cloud KMS CrytoKey Encryptor/Decryptor role dropdown in the information panel.

Configuring Automated Snowpipe Using GCS Pub/Sub

Prerequisites

The instructions in this topic assume the following items have been created and configured:

GCP account

For instructions, see the Pub/Sub documentation.

Snowflake
  • Target table in the Snowflake database where your data will be loaded.

Creating the Pub/Sub Topic

Create a Pub/Sub topic using Cloud Shell or Cloud SDK.

Execute the following command to create the topic and enable it to listen for activity in the specified GCS bucket:

$ gsutil notification create -t <topic> -f json gs://<bucket-name>

Where:

  • <topic> is the name for the topic.

  • <bucket-name> is the name of your GCS bucket.

If the topic already exists, the command uses it; otherwise, a new topic is created.

For more information, see Using Pub/Sub notifications for Cloud Storage in the Pub/Sub documentation.

Creating the Pub/Sub Subscription

Create a subscription to the Pub/Sub topic using the Cloud Console, gcloud command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation.

Retrieving the Pub/Sub Subscription ID

The Pub/Sub topic subscription ID is used in these instructions to allow Snowflake access to event messages.

  1. Log into the Google Cloud Platform Console as a project editor.

  2. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.

  3. Copy the ID in the Subscription ID column for the topic subscription

Step 1: Create a Notification Integration in Snowflake

Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub subscription. Snowflake associates the notification integration with a GCS service account created for your account. Snowflake creates a single service account that is referenced by all GCS notification integrations in your Snowflake account.

Note

  • Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

  • The GCS service account for notification integrations is different from the service account created for storage integrations.

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';

Where:

For example:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';

Note

Currently, the ALTER NOTIFICATION INTEGRATION command does not support modifying the GCP_PUBSUB_SUBSCRIPTION_NAME parameter value. If an incorrect parameter value is specified, you must recreate the notification integration (using CREATE OR REPLACE NOTIFICATION INTEGRATION).

Step 2: Grant Snowflake Access to the Pub/Sub Subscription

  1. Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    

    Where:

    For example:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
  2. Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format:

    <service_account>@<project_id>.iam.gserviceaccount.com
    
  3. Log into the Google Cloud Platform Console as a project editor.

  4. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.

  5. Select the subscription to configure for access.

  6. Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out.

  7. In the Add members field, search for the service account name you recorded.

  8. From the Select a role dropdown, select Pub/Sub Subscriber.

  9. Click the Add button. The service account name is added to the Pub/Sub Subscriber role dropdown in the information panel.

Step 3: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<integration_name>'
  AS
<copy_statement>;

Where:

  • <pipe_name> is the identifier for the pipe; must be unique for the schema in which the pipe is created.

    The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "My object"). Identifiers enclosed in double quotes are also case-sensitive.

  • <integration_name> is the name of the notification integration you created in Step 1: Create a Notification Integration in Snowflake.

  • copy_statement is the COPY INTO <table> statement used to load data from queued files into a Snowflake table. This statement serves as the text/definition for the pipe and is displayed in the SHOW PIPES output.

For example, create a pipe in the snowpipe_db.public schema that loads data from files staged in an external (GCS) stage named mystage into a destination table named mytable:

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MYINT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;

Important

Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same GCS bucket do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same GCS bucket with different levels of granularity, such as gcs://mybucket1/path1 and gcs://mybucket1/path1/path2. In this use case, if files are staged in gcs://mybucket1/path1/path2, the pipes for both stages would load a copy of the files.

This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the Pub/Sub messages. Additional care is required to avoid data duplication.

Snowpipe with auto-ingest is now configured!

When new data files are added to the GCS bucket, the event message informs Snowpipe to load them into the target table defined in the pipe.

Step 4: Load Historical Files

To load any backlog of data files that existed in the external stage before Pub/Sub messages were configured, execute an ALTER PIPE … REFRESH statement.

SYSTEM$PIPE_STATUS Output

The SYSTEM$PIPE_STATUS function retrieves a JSON representation of the current status of a pipe.

For pipes with AUTO_INGEST set to TRUE, the function returns a JSON object containing the following name/value pairs (if applicable to the current pipe status):

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}

Where:

executionState

Current execution state of the pipe; could be any one of the following:

  • RUNNING (i.e. everything is normal; Snowflake may or may not be actively processing files for this pipe)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Earliest timestamp among data files currently queued (if applicable), where the timestamp is set when the file is added to the queue.

pendingFileCount

Number of files currently being processed by the pipe. If the pipe is paused, this value will decrease as any files queued before the pipe was paused are processed. When this value is 0, either there are no files queued for this pipe or the pipe is effectively paused.

notificationChannelName

GCS storage queue associated with the pipe.

numOutstandingMessagesOnChannel

Number of messages in the storage queue that have been queued but not received yet.

lastReceivedMessageTimestamp

Timestamp of the last message received from the storage queue. Note that this message might not apply to the specific pipe (e.g., if the path/prefix associated with the message does not match the path/prefix in the pipe definition). In addition, only messages triggered by created data objects are consumed by auto-ingest pipes.

lastForwardedMessageTimestamp

Timestamp of the last “create object” event message with a matching path/prefix that was forwarded to the pipe.

error

Error message produced when the pipe was last compiled for execution (if applicable); often caused by problems accessing the necessary objects (i.e. table, stage, file format) due to permission problems or dropped objects.

fault

Most recent internal Snowflake process error (if applicable). Used primarily by Snowflake for debugging purposes.