# Automating Snowpipe for Google Cloud Storage¶

This topic provides instructions for triggering Snowpipe data loads automatically using Google Cloud Pub/Sub messages for Google Cloud Storage (GCS) events.

In this Topic:

## Cloud Platform Support¶

Triggering automated Snowpipe data loads using GCS Pub/Sub event messages is supported by Snowflake accounts hosted on the following cloud platforms:

• Amazon Web Services (AWS)

The instructions for configuring this support are identical for accounts on either cloud hosting platform. All Snowflake objects (e.g. integrations, stages, file formats) are created in the Snowflake account that stores your target tables.

Note

This section describes how to configure a Snowflake storage integration object to delegate authentication responsibility for cloud storage to a Snowflake identity and access management (IAM) entity.

This section describes how to use storage integrations to allow Snowflake to read data from and write to a Google Cloud Storage bucket referenced in an external (i.e. Cloud Storage) stage. Integrations are named, first-class Snowflake objects that avoid the need for passing explicit cloud provider credentials such as secret keys or access tokens; instead, integration objects reference a Cloud Storage service account. An administrator in your organization grants the service account permissions in the Cloud Storage account.

Administrators can also restrict users to a specific set of Cloud Storage buckets (and optional paths) accessed by external stages that use the integration.

Note

The following diagram shows the integration flow for a Cloud Storage stage:

1. An external (i.e. Cloud Storage) stage references a storage integration object in its definition.

2. Snowflake automatically associates the storage integration with a Cloud Storage service account created for your account. Snowflake creates a single service account that is referenced by all GCS storage integrations in your Snowflake account.

3. A project editor for your Cloud Storage project grants permissions to the service account to access the bucket referenced in the stage definition. Note that many external stage objects can reference different buckets and paths and use the same integration for authentication.

When a user loads or unloads data from or to a stage, Snowflake verifies the permissions granted to the service account on the bucket before allowing or denying access.

In this Section:

### Step 1: Create a Cloud Storage Integration in Snowflake¶

Create an integration using the CREATE STORAGE INTEGRATION command. An integration is a Snowflake object that delegates authentication responsibility for external cloud storage to a Snowflake-generated entity (i.e. a Cloud Storage service account). For accessing Cloud Storage buckets, Snowflake creates a service account that can be granted permissions to access the bucket(s) that store your data files.

A single storage integration can support multiple external (i.e. GCS) stages. The URL in the stage definition must align with the GCS buckets (and optional paths) specified for the STORAGE_ALLOWED_LOCATIONS parameter.

Note

Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]


Where:

• integration_name is the name of the new integration.

• bucket is the name of a Cloud Storage bucket that stores your data files (e.g. mybucket). The required STORAGE_ALLOWED_LOCATIONS parameter and optional STORAGE_BLOCKED_LOCATIONS parameter restrict or block access to these buckets, respectively, when stages that reference this integration are created or modified.

• path is an optional path that can be used to provide granular control over objects in the bucket.

The following example creates an integration that explicitly limits external stages that use the integration to reference either of two buckets and paths. In a later step, we will create an external stage that references one of these buckets and paths.

Additional external stages that also use this integration can reference the allowed buckets and paths:

CREATE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');


### Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account¶

Execute the DESCRIBE INTEGRATION command to retrieve the ID for the Cloud Storage service account that was created automatically for your Snowflake account:

DESC STORAGE INTEGRATION <integration_name>;


Where:

For example:

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+


The STORAGE_GCP_SERVICE_ACCOUNT property in the output shows the Cloud Storage service account created for your Snowflake account (e.g. service-account-id@project1-123456.iam.gserviceaccount.com). We provision a single Cloud Storage service account for your entire Snowflake account. All Cloud Storage integrations use that service account.

### Step 3: Grant the Service Account Permissions to Access Bucket Objects¶

The following step-by-step instructions describe how to configure IAM access permissions for Snowflake in your Google Cloud Platform Console so that you can use a Cloud Storage bucket to load and unload data:

#### Creating a Custom IAM Role¶

Create a custom role that has the permissions required to access the bucket and get objects.

2. From the home dashboard, choose IAM & admin » Roles.

3. Click Create Role.

4. Enter a name, and description for the custom role.

6. Filter the list of permissions, and add the following from the list:

• storage.buckets.get

• storage.objects.get

• storage.objects.list

Note

This permission also allows executing the REMOVE command on the stage:

• storage.buckets.get

• storage.objects.delete

• storage.objects.get

• storage.objects.list

• storage.buckets.get

• storage.objects.create

• storage.objects.delete

• storage.objects.get

• storage.objects.list

7. Click Create.

#### Assigning the Custom Role to the Cloud Storage Service Account¶

2. From the home dashboard, choose Storage » Browser:

3. Select a bucket to configure for access.

4. Click SHOW INFO PANEL in the upper-right corner. The information panel for the bucket slides out.

5. In the Add members field, search for the service account name from the DESCRIBE INTEGRATION output in Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account (in this topic).

6. From the Select a role dropdown, select Storage » Custom » <role>, where <role> is the custom Cloud Storage role you created in Creating a Custom IAM Role (in this topic).

7. Click the Add button. The service account name is added to the Storage Object Viewer role dropdown in the information panel.

#### Granting the Cloud Storage Service Account Permissions on the Cloud Key Management Service Cryptographic Keys¶

Note

This step is required only if your GCS bucket is encrypted using a key stored in the Google Cloud Key Management Service (Cloud KMS).

2. From the home dashboard, choose Security » Cryptographic keys.

3. Select the key ring that is assigned to your GCS bucket.

4. Click SHOW INFO PANEL in the upper-right corner. The information panel for the key ring slides out.

5. In the Add members field, search for the service account name from the DESCRIBE INTEGRATION output in Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account (in this topic).

6. From the Select a role dropdown, select the Cloud KMS CrytoKey Encryptor/Decryptor role.

7. Click the Add button. The service account name is added to the Cloud KMS CrytoKey Encryptor/Decryptor role dropdown in the information panel.

## Configuring Automation Using GCS Pub/Sub¶

### Prerequisites¶

The instructions in this topic assume the following items have been created and configured:

GCP account

For instructions, see the Pub/Sub documentation.

Snowflake
• Target table in the Snowflake database where your data will be loaded.

#### Creating the Pub/Sub Topic¶

Create a Pub/Sub topic using Cloud Shell or Cloud SDK.

Execute the following command to create the topic and enable it to listen for activity in the specified GCS bucket:

$gsutil notification create -t <topic> -f json gs://<bucket-name>  Where: • <topic> is the name for the topic. • <bucket-name> is the name of your GCS bucket. If the topic already exists, the command uses it; otherwise, a new topic is created. For more information, see Using Pub/Sub notifications for Cloud Storage in the Pub/Sub documentation. #### Creating the Pub/Sub Subscription¶ Create a subscription with pull delivery to the Pub/Sub topic using the Cloud Console, gcloud command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation. Note • Only Pub/Sub subscriptions that use the default pull delivery are supported with Snowflake. Push delivery is not supported. #### Retrieving the Pub/Sub Subscription ID¶ The Pub/Sub topic subscription ID is used in these instructions to allow Snowflake access to event messages. 1. Log into the Google Cloud Platform Console as a project editor. 2. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions. 3. Copy the ID in the Subscription ID column for the topic subscription ### Step 1: Create a Notification Integration in Snowflake¶ Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub subscription. Snowflake associates the notification integration with a GCS service account created for your account. Snowflake creates a single service account that is referenced by all GCS notification integrations in your Snowflake account. Note • Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command. • The GCS service account for notification integrations is different from the service account created for storage integrations. CREATE NOTIFICATION INTEGRATION <integration_name> TYPE = QUEUE NOTIFICATION_PROVIDER = GCP_PUBSUB ENABLED = true GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';  Where: For example: CREATE NOTIFICATION INTEGRATION my_notification_int TYPE = QUEUE NOTIFICATION_PROVIDER = GCP_PUBSUB ENABLED = true GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';  ### Step 2: Grant Snowflake Access to the Pub/Sub Subscription¶ 1. Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID: DESC NOTIFICATION INTEGRATION <integration_name>;  Where: For example: DESC NOTIFICATION INTEGRATION my_notification_int;  2. Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format: <service_account>@<project_id>.iam.gserviceaccount.com  3. Log into the Google Cloud Platform Console as a project editor. 4. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions. 5. Select the subscription to configure for access. 6. Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out. 7. In the Add members field, search for the service account name you recorded. 8. From the Select a role dropdown, select Pub/Sub Subscriber. 9. Click the Add button. The service account name is added to the Pub/Sub Subscriber role dropdown in the information panel. 10. Navigate to the Dashboard page in the Cloud Console, and select your project from the dropdown list. 11. Click the ADD PEOPLE TO THIS PROJECT button. 12. Add the service account name you recorded. 13. From the Select a role dropdown, select Monitoring Viewer. 14. Click the Add button. The service account name is added to the Monitoring Viewer role. ### Step 3: Create a Pipe with Auto-Ingest Enabled¶ Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table. CREATE PIPE <pipe_name> AUTO_INGEST = true INTEGRATION = '<notification_integration_name>' AS <copy_statement>;  Where: <pipe_name> Identifier for the pipe; must be unique for the schema in which the pipe is created. The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "My object"). Identifiers enclosed in double quotes are also case-sensitive. INTEGRATION = '<notification_integration_name>' Name of the notification integration used to automatically refresh the directory table metadata using Azure Event Grid notifications. A notification integration is a Snowflake object that provides an interface between Snowflake and third-party cloud message queuing services. copy_statement COPY INTO <table> statement used to load data from queued files into a Snowflake table. This statement serves as the text/definition for the pipe and is displayed in the SHOW PIPES output. For example, create a pipe in the snowpipe_db.public schema that loads data from files staged in an external (GCS) stage named mystage into a destination table named mytable: CREATE PIPE snowpipe_db.public.mypipe AUTO_INGEST = true INTEGRATION = 'MY_NOTIFICATION_INT' AS COPY INTO snowpipe_db.public.mytable FROM @snowpipe_db.public.mystage/path2;  The INTEGRATION parameter references the my_notification_int notification integration you created in Step 1: Create a Notification Integration in Snowflake. The integration name must be provided in all uppercase. Important Verify that the storage location reference in the COPY INTO <table> statement does not overlap with the reference in existing pipes in the account. Otherwise, multiple pipes could load the same set of data files into the target tables. For example, this situation can occur when multiple pipe definitions reference the same storage location with different levels of granularity, such as <storage_location>/path1/ and <storage_location>/path1/path2/. In this example, if files are staged in <storage_location>/path1/path2/, both pipes would load a copy of the files. View the COPY INTO <table> statements in the definitions of all pipes in the account by executing SHOW PIPES or by querying either the PIPES view in Account Usage or the PIPES view in the Information Schema. Snowpipe with auto-ingest is now configured! When new data files are added to the GCS bucket, the event message informs Snowpipe to load them into the target table defined in the pipe. ### Step 4: Load Historical Files¶ To load any backlog of data files that existed in the external stage before Pub/Sub messages were configured, execute an ALTER PIPE … REFRESH statement. ### Step 5: Delete Staged Files¶ Delete the staged files after you successfully load the data and no longer require the files. For instructions, see Deleting Staged Files After Snowpipe Loads the Data. ## SYSTEM$PIPE_STATUS Output¶

The SYSTEM\$PIPE_STATUS function retrieves a JSON representation of the current status of a pipe.

For pipes with AUTO_INGEST set to TRUE, the function returns a JSON object containing the following name/value pairs (if applicable to the current pipe status):

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}


For descriptions of the output values, see the reference topic for the SQL function.