Activation des notifications d’erreur Snowpipe pour Google Pub/Sub

Cette rubrique fournit des instructions pour envoyer des notifications d’erreur Snowpipe au service Google Cloud Pub/Sub (Pub/Sub).

Cette fonction permet d’envoyer des notifications d’erreur pour les types de chargements suivants :

  • Auto-intégration Snowpipe.

  • Appels vers le point de terminaison de l’API REST insertFiles Snowpipe.

  • Chargements à partir d’Apache Kafka en utilisant le connecteur Snowflake pour Kafka avec la méthode d’ingestion Snowpipe uniquement.

Dans ce chapitre :

Prise en charge de la plateforme Cloud

Actuellement, cette fonctionnalité est limitée aux comptes Snowflake hébergés sur Google Cloud Platform (GCP). Snowpipe peut charger des données à partir de fichiers dans n’importe quel service de stockage dans le Cloud pris en charge. Toutefois, les notifications push vers Pub/Sub ne sont prises en charge que dans les comptes Snowflake hébergés sur GCP.

Remarques

  • Snowflake garantit la livraison des messages d’erreur au moins une fois (c’est-à-dire que plusieurs tentatives sont faites pour livrer les messages afin de s’assurer qu’au moins une tentative réussisse, ce qui peut entraîner des messages en double).

  • Cette fonctionnalité est mise en œuvre à l’aide de l’objet d’intégration de notification. Une intégration de notification est un objet Snowflake qui fournit une interface entre Snowflake et des services tiers de mise en file d’attente de messages dans le Cloud. Une seule intégration de notification peut prendre en charge plusieurs canaux.

Activation des notifications d’erreur

Step 1: Creating the Pub/Sub Topic

Create a Pub/Sub topic that can receive error notification messages from Snowflake, or reuse an existing topic. You can create the topic using Cloud Shell or Cloud SDK. For more information, see Create and use topics in the Pub/Sub documentation.

For example, execute the following command to create an empty topic:

$ gsutil notification create -t <topic>
Copy

If the topic already exists, the command uses it; otherwise, a new topic is created.

Step 2: Creating the Pub/Sub Subscription

Optionally create a subscription to the Pub/Sub topic to retrieve error notifications. You can create a subscription with pull delivery using the Cloud Console, gcloud command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation.

Step 3: Creating a Notification Integration in Snowflake

Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub topic. Snowflake associates the notification integration with a Goodle Cloud Platform (GCP) service account created for your account. Snowflake creates a single service account that is referenced by all GCP notification integrations in your Snowflake account.

Note

  • Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.

  • The GCP service account for notification integrations is different from the service account created for storage integrations.

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = TRUE
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Copy

Où :

  • integration_name is the name of the new integration.

  • topic_id is the Pub/Sub topic to which Snowflake sends error notifications. For more information, see Step 1: Creating the Pub/Sub Topic (in this topic).

Par exemple :

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Copy

Step 4: Granting Snowflake Access to the Pub/Sub Subscription

  1. Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    Où :

    • integration_name is the name of the integration you created in « Step 1: Create a Notification Integration in Snowflake ».

    Par exemple :

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format:

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. Log into the Google Cloud Platform Console as a project editor.

  4. From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.

  5. Select the subscription to configure for access.

  6. Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out.

  7. In the Add members field, search for the service account name you recorded.

  8. From the Select a role dropdown, select Pub/Sub Publisher.

  9. Click the Add button. The service account name is added to the Pub/Sub Publisher role dropdown in the information panel.

Step 5: Enabling error notifications in pipes

Une seule intégration de notification peut être partagée par plusieurs canaux. Le corps des messages d’erreur identifie le canal, la zone de préparation externe et le chemin, ainsi que le fichier d’où provient l’erreur, entre autres détails.

Pour activer les notifications d’erreur pour un canal, spécifiez une valeur de paramètre ERROR_INTEGRATION.

Note

La création ou la modification d’un canal qui fait référence à une intégration de notification nécessite un rôle qui possède le privilège USAGE sur l’intégration de notification. En outre, le rôle doit avoir le privilège CREATE PIPE sur le schéma ou le privilège OWNERSHIP sur le canal, respectivement.

Notez que l’exploitation d’un objet dans un schéma requiert également le privilège USAGE sur la base de données et le schéma parents.

Pour obtenir des instructions sur la création d’un rôle personnalisé avec un ensemble spécifique de privilèges, voir Création de rôles personnalisés.

Pour des informations générales sur les rôles et les privilèges accordés pour effectuer des actions SQL sur des objets sécurisables, voir Aperçu du contrôle d’accès.

Nouveau canal

Créez un nouveau canal en utilisant CREATE PIPE :

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

Où :

ERROR_INTEGRATION = <nom_intégration>

Name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).

Par exemple :

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

Canal existant

Modifiez un canal existant en utilisant ALTER PIPE.

Note

Si une intégration de notification a été spécifiée lors de la création du canal, il faut d’abord supprimer le paramètre ERROR_INTEGRATION (en utilisant ALTER PIPE … UNSET ERROR_INTEGRATION) puis définir le paramètre.

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

Where <integration_name> is the name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).

Par exemple :

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

Charge utile du message de notification d’erreur

Le corps des messages d’erreur identifie le canal et les erreurs rencontrées lors d’un chargement.

Voici un exemple de charge utile de message décrivant une erreur Snowpipe. Notez que la charge utile peut inclure un ou plusieurs messages d’erreur.

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

Notez que vous devez analyser la chaîne dans un objet JSON pour traiter les valeurs dans la charge utile.