Activation des notifications d’erreur Snowpipe pour Google Pub/Sub¶
Cette rubrique fournit des instructions pour envoyer des notifications d’erreur Snowpipe au service Google Cloud Pub/Sub (Pub/Sub).
Cette fonction permet d’envoyer des notifications d’erreur pour les types de chargements suivants :
Auto-intégration Snowpipe.
Appels vers le point de terminaison de l’API REST
insertFiles
Snowpipe.Chargements à partir d’Apache Kafka en utilisant le connecteur Snowflake pour Kafka avec la méthode d’ingestion Snowpipe uniquement.
Dans ce chapitre :
Prise en charge de la plateforme Cloud¶
Actuellement, cette fonctionnalité est limitée aux comptes Snowflake hébergés sur Google Cloud Platform (GCP). Snowpipe peut charger des données à partir de fichiers dans n’importe quel service de stockage dans le Cloud pris en charge. Toutefois, les notifications push vers Pub/Sub ne sont prises en charge que dans les comptes Snowflake hébergés sur GCP.
Remarques¶
Snowflake garantit la livraison des messages d’erreur au moins une fois (c’est-à-dire que plusieurs tentatives sont faites pour livrer les messages afin de s’assurer qu’au moins une tentative réussisse, ce qui peut entraîner des messages en double).
Cette fonctionnalité est mise en œuvre à l’aide de l’objet d’intégration de notification. Une intégration de notification est un objet Snowflake qui fournit une interface entre Snowflake et des services tiers de mise en file d’attente de messages dans le Cloud. Une seule intégration de notification peut prendre en charge plusieurs canaux.
Activation des notifications d’erreur¶
« ֤Étape 1 : Création du sujet Pub/Sub¶
Créez un sujet Pub/Sub qui peut recevoir des messages de notification d’erreur de Snowflake, ou réutilisez un sujet existant. Vous pouvez créer le sujet en utilisant Cloud Shell ou Cloud SDK. Pour plus d’informations, voir Créer et utiliser des sujets dans la documentation Pub/Sub.
Par exemple, exécutez la commande suivante pour créer un sujet vide :
$ gsutil notification create -t <topic>
Si le sujet existe déjà, la commande l’utilise ; sinon, un nouveau sujet est créé.
« ֤Étape 2 : Création d’un abonnement Pub/Sub¶
Vous pouvez également créer un abonnement au sujet Pub/Sub pour récupérer les notifications d’erreur. Vous pouvez créer un abonnement avec transmission de type pull à l’aide de la console Cloud, de l’outil de ligne de commande gcloud
ou de l’API Cloud Pub/Sub. Pour obtenir des instructions, voir Gérer les sujets et les abonnements dans la documentation Pub/Sub.
Étape 3 : Création d’une intégration de notification dans Snowflake¶
Créez une intégration de notification à l’aide de la commande CREATE NOTIFICATION INTEGRATION. L’intégration des notifications fait référence à votre sujet Pub/Sub. Snowflake associe l’intégration de la notification à un compte de service Goodle Cloud Platform (GCP) créé pour votre compte. Snowflake crée un seul compte de service référencé par toutes les notifications de stockage GCP de votre compte Snowflake.
Note
Seuls les administrateurs de compte (utilisateurs dotés du rôle ACCOUNTADMIN) ou un rôle disposant du privilège global CREATE INTEGRATION peuvent exécuter cette commande SQL.
Le compte de service GCP pour les intégrations de notification est différent du compte de service créé pour les intégrations de stockage.
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = TRUE
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Où :
integration_name
est le nom de la nouvelle intégration.topic_id
est le sujet Pub/Sub auquel Snowflake envoie des notifications d’erreur. Pour plus d’informations, voir Étape 1 : Création du sujet Pub/Sub (dans cette rubrique).
Par exemple :
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Étape 4 : Accord d’un accès à Snowflake à l’abonnement Pub/Sub¶
Exécutez la commande DESCRIBE INTEGRATION pour récupérer l’ID de compte de service Snowflake :
DESC NOTIFICATION INTEGRATION <integration_name>;
Où :
integration_name
est le nom de l’intégration créée à l’étape 1 : « Créer une intégration de notification dans Snowflake. »
Par exemple :
DESC NOTIFICATION INTEGRATION my_notification_int;
Enregistrez le nom du compte de service dans la colonne GCP_PUBSUB_SERVICE_ACCOUNT, qui a le format suivant :
<service_account>@<project_id>.iam.gserviceaccount.com
Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.
Dans le tableau de bord d’accueil, sélectionnez Big Data » Pub/Sub » Subscriptions.
Sélectionnez l’abonnement à configurer pour l’accès.
Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information de l’abonnement s’affiche en coulissant.
Dans le champ Add members, recherchez le nom du compte de service que vous avez enregistré.
Dans la liste déroulante Select a role, sélectionnez Pub/Sub Publisher.
Cliquez sur le bouton Add. Le nom du compte de service est ajouté à la liste déroulante des rôles Pub/Sub Publisher dans le panneau d’informations.
Étape 5 : Activation des notifications d’erreur dans les canaux¶
Une seule intégration de notification peut être partagée par plusieurs canaux. Le corps des messages d’erreur identifie le canal, la zone de préparation externe et le chemin, ainsi que le fichier d’où provient l’erreur, entre autres détails.
Pour activer les notifications d’erreur pour un canal, spécifiez une valeur de paramètre ERROR_INTEGRATION.
Note
La création ou la modification d’un canal qui fait référence à une intégration de notification nécessite un rôle qui possède le privilège USAGE sur l’intégration de notification. En outre, le rôle doit avoir le privilège CREATE PIPE sur le schéma ou le privilège OWNERSHIP sur le canal, respectivement.
Notez que l’exploitation d’un objet dans un schéma requiert également le privilège USAGE sur la base de données et le schéma parents.
Pour obtenir des instructions sur la création d’un rôle personnalisé avec un ensemble spécifique de privilèges, voir Création de rôles personnalisés.
Pour des informations générales sur les rôles et les privilèges accordés pour effectuer des actions SQL sur des objets sécurisables, voir Aperçu du contrôle d’accès.
Nouveau canal¶
Créez un nouveau canal en utilisant CREATE PIPE :
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
Où :
ERROR_INTEGRATION = <nom_intégration>
Nom de l’intégration de notification créée à l”étape 4 : Création de l’intégration des notifications (dans cette rubrique).
Par exemple :
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
Canal existant¶
Modifiez un canal existant en utilisant ALTER PIPE.
Note
Si une intégration de notification a été spécifiée lors de la création du canal, il faut d’abord supprimer le paramètre ERROR_INTEGRATION (en utilisant ALTER PIPE … UNSET ERROR_INTEGRATION) puis définir le paramètre.
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Où <nom_intégration>
est le nom de l’intégration de notification créée à l”étape 4 : Création de l’intégration des notifications (dans cette rubrique).
Par exemple :
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Charge utile du message de notification d’erreur¶
Le corps des messages d’erreur identifie le canal et les erreurs rencontrées lors d’un chargement.
Voici un exemple de charge utile de message décrivant une erreur Snowpipe. Notez que la charge utile peut inclure un ou plusieurs messages d’erreur.
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Notez que vous devez analyser la chaîne dans un objet JSON pour traiter les valeurs dans la charge utile.