Activation des notifications d’erreur Snowpipe pour Google Pub/Sub¶
Cette rubrique fournit des instructions pour envoyer des notifications d’erreur Snowpipe au service Google Cloud Pub/Sub (Pub/Sub).
Cette fonction permet d’envoyer des notifications d’erreur pour les types de chargements suivants :
Auto-intégration Snowpipe.
Appels vers le point de terminaison de l’API REST
insertFiles
Snowpipe.Chargements à partir d’Apache Kafka en utilisant le connecteur Snowflake pour Kafka avec la méthode d’ingestion Snowpipe uniquement.
Dans ce chapitre :
Prise en charge de la plateforme Cloud¶
Actuellement, cette fonctionnalité est limitée aux comptes Snowflake hébergés sur Google Cloud Platform (GCP). Snowpipe peut charger des données à partir de fichiers dans n’importe quel service de stockage dans le Cloud pris en charge. Toutefois, les notifications push vers Pub/Sub ne sont prises en charge que dans les comptes Snowflake hébergés sur GCP.
Remarques¶
Snowflake garantit la livraison des messages d’erreur au moins une fois (c’est-à-dire que plusieurs tentatives sont faites pour livrer les messages afin de s’assurer qu’au moins une tentative réussisse, ce qui peut entraîner des messages en double).
Cette fonctionnalité est mise en œuvre à l’aide de l’objet d’intégration de notification. Une intégration de notification est un objet Snowflake qui fournit une interface entre Snowflake et des services tiers de mise en file d’attente de messages dans le Cloud. Une seule intégration de notification peut prendre en charge plusieurs canaux.
Activation des notifications d’erreur¶
Step 1: Creating the Pub/Sub Topic¶
Create a Pub/Sub topic that can receive error notification messages from Snowflake, or reuse an existing topic. You can create the topic using Cloud Shell or Cloud SDK. For more information, see Create and use topics in the Pub/Sub documentation.
For example, execute the following command to create an empty topic:
$ gsutil notification create -t <topic>
If the topic already exists, the command uses it; otherwise, a new topic is created.
Step 2: Creating the Pub/Sub Subscription¶
Optionally create a subscription to the Pub/Sub topic to retrieve error notifications. You can create a subscription with pull delivery using the Cloud Console, gcloud
command-line tool, or the Cloud Pub/Sub API. For instructions, see Managing topics and subscriptions in the Pub/Sub documentation.
Step 3: Creating a Notification Integration in Snowflake¶
Create a notification integration using the CREATE NOTIFICATION INTEGRATION command. The notification integration references your Pub/Sub topic. Snowflake associates the notification integration with a Goodle Cloud Platform (GCP) service account created for your account. Snowflake creates a single service account that is referenced by all GCP notification integrations in your Snowflake account.
Note
Only account administrators (users with the ACCOUNTADMIN role) or a role with the global CREATE INTEGRATION privilege can execute this SQL command.
The GCP service account for notification integrations is different from the service account created for storage integrations.
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = TRUE
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Où :
integration_name
is the name of the new integration.topic_id
is the Pub/Sub topic to which Snowflake sends error notifications. For more information, see Step 1: Creating the Pub/Sub Topic (in this topic).
Par exemple :
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Step 4: Granting Snowflake Access to the Pub/Sub Subscription¶
Execute the DESCRIBE INTEGRATION command to retrieve the Snowflake service account ID:
DESC NOTIFICATION INTEGRATION <integration_name>;
Où :
integration_name
is the name of the integration you created in « Step 1: Create a Notification Integration in Snowflake ».
Par exemple :
DESC NOTIFICATION INTEGRATION my_notification_int;
Record the service account name in the GCP_PUBSUB_SERVICE_ACCOUNT column, which has the following format:
<service_account>@<project_id>.iam.gserviceaccount.com
Log into the Google Cloud Platform Console as a project editor.
From the home dashboard, choose Big Data » Pub/Sub » Subscriptions.
Select the subscription to configure for access.
Click SHOW INFO PANEL in the upper-right corner. The information panel for the subscription slides out.
In the Add members field, search for the service account name you recorded.
From the Select a role dropdown, select Pub/Sub Publisher.
Click the Add button. The service account name is added to the Pub/Sub Publisher role dropdown in the information panel.
Step 5: Enabling error notifications in pipes¶
Une seule intégration de notification peut être partagée par plusieurs canaux. Le corps des messages d’erreur identifie le canal, la zone de préparation externe et le chemin, ainsi que le fichier d’où provient l’erreur, entre autres détails.
Pour activer les notifications d’erreur pour un canal, spécifiez une valeur de paramètre ERROR_INTEGRATION.
Note
La création ou la modification d’un canal qui fait référence à une intégration de notification nécessite un rôle qui possède le privilège USAGE sur l’intégration de notification. En outre, le rôle doit avoir le privilège CREATE PIPE sur le schéma ou le privilège OWNERSHIP sur le canal, respectivement.
Notez que l’exploitation d’un objet dans un schéma requiert également le privilège USAGE sur la base de données et le schéma parents.
Pour obtenir des instructions sur la création d’un rôle personnalisé avec un ensemble spécifique de privilèges, voir Création de rôles personnalisés.
Pour des informations générales sur les rôles et les privilèges accordés pour effectuer des actions SQL sur des objets sécurisables, voir Aperçu du contrôle d’accès.
Nouveau canal¶
Créez un nouveau canal en utilisant CREATE PIPE :
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
Où :
ERROR_INTEGRATION = <nom_intégration>
Name of the notification integration you created in Step 4: Creating the Notification Integration (in this topic).
Par exemple :
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
Canal existant¶
Modifiez un canal existant en utilisant ALTER PIPE.
Note
Si une intégration de notification a été spécifiée lors de la création du canal, il faut d’abord supprimer le paramètre ERROR_INTEGRATION (en utilisant ALTER PIPE … UNSET ERROR_INTEGRATION) puis définir le paramètre.
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Where <integration_name>
is the name of the notification integration you created in Step 4: Creating the Notification
Integration (in this topic).
Par exemple :
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Charge utile du message de notification d’erreur¶
Le corps des messages d’erreur identifie le canal et les erreurs rencontrées lors d’un chargement.
Voici un exemple de charge utile de message décrivant une erreur Snowpipe. Notez que la charge utile peut inclure un ou plusieurs messages d’erreur.
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Notez que vous devez analyser la chaîne dans un objet JSON pour traiter les valeurs dans la charge utile.