Enabling Snowpipe Error Notifications for Google Pub/Sub¶
This topic provides instructions for pushing Snowpipe error notifications to the Google Cloud Pub/Sub (Pub/Sub) service.
This feature can push error notifications for the following types of loads:
Auto-ingest Snowpipe.
Calls to the Snowpipe
insertFiles
REST API endpoint.Loads from Apache Kafka using the Snowflake Connector for Kafka with the Snowpipe ingestion method only.
Cloud Platform Support¶
Currently, this feature is limited to Snowflake accounts hosted on Google Cloud Platform (GCP). Snowpipe can load data from files in any supported cloud storage service; however, push notifications to Pub/Sub are only supported in Snowflake accounts hosted on GCP.
Notes¶
Snowflake guarantees at-least-once message delivery of error notifications (i.e. multiple attempts are made to deliver messages to ensure at least one attempt succeeds, which can result in duplicate messages).
This feature is implemented using the notification integration object. A notification integration is a Snowflake object that provides an interface between Snowflake and third-party cloud message queuing services. A single notification integration can support multiple pipes.
Enabling Error Notifications¶
Step 1: Creating the Pub/Sub Topic¶
Create a Pub/Sub topic that can receive error notification messages from Snowflake, or reuse an existing topic. You can create the topic using Cloud Shell or Cloud SDK. For more information, see Create and use topics in the Pub/Sub documentation.
For example, execute the following command to create an empty topic:
$ gsutil notification create -t <topic>
If the topic already exists, the command uses it; otherwise, a new topic is created.
Step 2: Creating the Pub/Sub Subscription¶
Optionally create a subscription to the Pub/Sub topic to retrieve error notifications. You can create a subscription with pull delivery using the Cloud Console, gcloud
command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation.
Step 3: Creating a Notification Integration in Snowflake¶
Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub topic. Snowflake associates the notification integration with a Goodle Cloud Platform (GCP) service account created for your account. Snowflake creates a single service account that is referenced by all GCP notification integrations in your Snowflake account.
Note
Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.
The GCP service account for notification integrations is different from the service account created for storage integrations.
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = TRUE
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Where:
integration_name
is the name of the new integration.topic_id
is the Pub/Sub topic to which Snowflake sends error notifications. For more information, see Step 1: Creating the Pub/Sub Topic (in this topic).
For example:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Step 4: Granting Snowflake Access to the Pub/Sub Subscription¶
Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID:
DESC NOTIFICATION INTEGRATION <integration_name>;
Where:
integration_name
is the name of the integration you created in “Step 1: Create a Notification Integration in Snowflake”.
For example:
DESC NOTIFICATION INTEGRATION my_notification_int;
Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format:
<service_account>@<project_id>.iam.gserviceaccount.com
Log into the Google Cloud Platform Console as a project editor.
From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.
Select the subscription to configure for access.
Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out.
In the Add members field, search for the service account name you recorded.
From the Select a role dropdown, select Pub/Sub Publisher.
Click the Add button. The service account name is added to the Pub/Sub Publisher role dropdown in the information panel.
Step 5: Enabling Error Notifications in Pipes¶
A single notification integration can be shared by multiple pipes. The body of error messages identifies the pipe, external stage and path, and file where the error originated, among other details.
To enable error notifications for a pipe, specify an ERROR_INTEGRATION parameter value.
Note
Creating or modifying a pipe that references a notification integration requires a role that has the USAGE privilege on the notification integration. In addition, the role must have either the CREATE PIPE privilege on the schema or the OWNERSHIP privilege on the pipe, respectively.
Note that operating on any object in a schema also requires the USAGE privilege on the parent database and schema.
For instructions on creating a custom role with a specified set of privileges, see Creating Custom Roles.
For general information about roles and privilege grants for performing SQL actions on securable objects, see Overview of Access Control.
New Pipe¶
Create a new pipe using CREATE PIPE:
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
Where:
ERROR_INTEGRATION = <integration_name>
Name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).
For example:
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
Existing Pipe¶
Modify an existing pipe using ALTER PIPE.
Note
If a notification integration was specified when the pipe was created, it is necessary to first unset the ERROR_INTEGRATION parameter (using ALTER PIPE … UNSET ERROR_INTEGRATION) and then set the parameter.
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Where <integration_name>
is the name of the notification integration you created in Step 4: Creating the Notification
Integration (in this topic).
For example:
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Error Notification Message Payload¶
The body of error messages identifies the pipe and the errors encountered during a load.
The following is a sample message payload describing a Snowpipe error. The payload can include one or more error messages.
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Note that you must parse the string into a JSON object to process values in the payload.