Enabling Snowpipe error notifications for Google Pub/Sub

This topic provides instructions for pushing Snowpipe error notifications to the Google Cloud Pub/Sub (Pub/Sub) service.

This feature can push error notifications for the following types of loads:

  • Auto-ingest Snowpipe.

  • Calls to the Snowpipe insertFiles REST API endpoint.

  • Loads from Apache Kafka using the Snowflake Connector for Kafka with the Snowpipe ingestion method only.

Cloud platform support

Currently, this feature is limited to Snowflake accounts hosted on Google Cloud Platform (GCP). Snowpipe can load data from files in any supported cloud storage service; however, push notifications to Pub/Sub are only supported in Snowflake accounts hosted on GCP.

Notes

  • Snowflake guarantees at-least-once message delivery of error notifications (i.e. multiple attempts are made to deliver messages to ensure at least one attempt succeeds, which can result in duplicate messages).

  • This feature is implemented using the notification integration object. A notification integration is a Snowflake object that provides an interface between Snowflake and third-party cloud message queuing services. A single notification integration can support multiple pipes.

Enabling error notifications

Step 1: Creating the Pub/Sub Topic

Create a Pub/Sub topic that can receive error notification messages from Snowflake, or reuse an existing topic. You can create the topic using Cloud Shell or Cloud SDK. For more information, see Create and use topics in the Pub/Sub documentation.

For example, execute the following command to create an empty topic:

$ gsutil notification create -t <topic>
Copy

If the topic already exists, the command uses it; otherwise, a new topic is created.

Step 2: Creating the Pub/Sub Subscription

Optionally create a subscription to the Pub/Sub topic to retrieve error notifications. You can create a subscription with pull delivery using the Cloud Console, gcloud command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation.

Step 3: Creating a Notification Integration in Snowflake

Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub topic. Snowflake associates the notification integration with a Goodle Cloud Platform (GCP) service account created for your account. Snowflake creates a single service account that is referenced by all GCP notification integrations in your Snowflake account.

Note

  • Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

  • The GCP service account for notification integrations is different from the service account created for storage integrations.

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = TRUE
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Copy

Where:

  • integration_name is the name of the new integration.

  • topic_id is the Pub/Sub topic to which Snowflake sends error notifications. For more information, see Step 1: Creating the Pub/Sub Topic (in this topic).

For example:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Copy

Step 4: Granting Snowflake Access to the Pub/Sub Subscription

  1. Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    Where:

    • integration_name is the name of the integration you created in “Step 1: Create a Notification Integration in Snowflake”.

    For example:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format:

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. Log into the Google Cloud Platform Console as a project editor.

  4. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.

  5. Select the subscription to configure for access.

  6. Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out.

  7. In the Add members field, search for the service account name you recorded.

  8. From the Select a role dropdown, select Pub/Sub Publisher.

  9. Click the Add button. The service account name is added to the Pub/Sub Publisher role dropdown in the information panel.

Step 5: Enabling error notifications in pipes

A single notification integration can be shared by multiple pipes. The body of error messages identifies the pipe, external stage and path, and file where the error originated, among other details.

To enable error notifications for a pipe, specify an ERROR_INTEGRATION parameter value.

Note

Creating or modifying a pipe that references a notification integration requires a role that has the USAGE privilege on the notification integration. In addition, the role must have either the CREATE PIPE privilege on the schema or the OWNERSHIP privilege on the pipe, respectively.

Note that operating on any object in a schema also requires the USAGE privilege on the parent database and schema.

For instructions on creating a custom role with a specified set of privileges, see Creating custom roles.

For general information about roles and privilege grants for performing SQL actions on securable objects, see Overview of Access Control.

New pipe

Create a new pipe using CREATE PIPE:

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

Where:

ERROR_INTEGRATION = <integration_name>

Name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).

For example:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

Existing pipe

Modify an existing pipe using ALTER PIPE.

Note

If a notification integration was specified when the pipe was created, it is necessary to first unset the ERROR_INTEGRATION parameter (using ALTER PIPE … UNSET ERROR_INTEGRATION) and then set the parameter.

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

Where <integration_name> is the name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).

For example:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

Error notification message payload

The body of error messages identifies the pipe and the errors encountered during a load.

The following is a sample message payload describing a Snowpipe error. The payload can include one or more error messages.

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

Note that you must parse the string into a JSON object to process values in the payload.