Snowpipe automatisé pour Google Cloud Storage

Cette rubrique fournit des instructions pour déclencher automatiquement les chargements de données Snowpipe à l’aide de messages Google Cloud Pub/Sub pour les événements Google Cloud Storage (GCS).

Notez que seuls les événements OBJECT_FINALIZE déclenchent Snowpipe pour charger des fichiers. Snowflake recommande de n’envoyer que des événements pris en charge par Snowpipe afin de réduire les coûts, le bruit des événements et la latence.

Dans ce chapitre :

Prise en charge de la plate-forme Cloud

Le déclenchement de chargements de données Snowpipe automatisés à l’aide de messages d’événements GCS Pub/Sub est pris en charge par les comptes Snowflake hébergés sur toutes les plates-formes Cloud prises en charge.

Configuration de l’accès sécurisé au stockage Cloud

Note

Si vous avez déjà configuré un accès sécurisé au compartiment GCS qui stocke vos fichiers de données, vous pouvez ignorer cette section.

Cette section décrit comment configurer un objet d’intégration de stockage Snowflake pour déléguer la responsabilité de l’authentification pour le stockage dans le Cloud à une entité Gestion des identités et des accès Snowflake (IAM).

Cette section explique comment utiliser les intégrations de stockage pour permettre à Snowflake de lire des données et de les écrire dans un compartiment de Google Cloud Storage référencé dans une zone de préparation externe (c’est-à-dire Cloud Storage). Les intégrations sont des objets Snowflake de première classe nommés, qui évitent de transmettre des informations d’identification explicites de fournisseur de Cloud, telles que des clés secrètes ou des jetons d’accès ; au lieu de cela, les objets d’intégration font référence à un compte de service Cloud Storage. Un administrateur de votre organisation accorde des autorisations au compte de service dans le compte Cloud Storage.

Les administrateurs peuvent également limiter les utilisateurs à un ensemble spécifique de compartiments Cloud Storage (et de chemins d’accès facultatifs) auxquels accèdent des zones de préparation externes utilisant l’intégration.

Note

Pour suivre les instructions de cette section, vous devez avoir accès à votre projet Cloud Storage en tant qu’éditeur de projet. Si vous n’êtes pas un éditeur de projet, demandez à votre administrateur Cloud Storage d’effectuer ces tâches.

Le diagramme suivant illustre le flux d’intégration d’une zone de préparation Cloud Storage :

Google Cloud Storage Stage Integration Flow
  1. Une zone de préparation externe (c.-à-d Cloud Storage) fait référence à un objet d’intégration de stockage dans sa définition.

  2. Snowflake associe automatiquement l’intégration de stockage à un compte de service Cloud Storage créé pour votre compte. Snowflake crée un seul compte de service référencé par toutes les intégrations de stockage GCS de votre compte Snowflake.

  3. Un éditeur de projet pour votre projet Cloud Storage accorde des autorisations au compte de service pour accéder au compartiment référencé dans la définition de la zone de stockage. Notez que de nombreux objets de zone de stockage externes peuvent référencer différents compartiments et chemins et utiliser la même intégration pour l’authentification.

Lorsqu’un utilisateur charge ou décharge des données depuis ou vers une zone de préparation, Snowflake vérifie les autorisations accordées au compte de service sur le compartiment avant d’autoriser ou de refuser l’accès.

Dans cette section :

Étape 1 : Création d’une intégration Cloud Storage dans Snowflake

Créez une intégration à l’aide de la commande CREATE STORAGE INTEGRATION. Une intégration est un objet Snowflake qui délègue la responsabilité de l’authentification pour un stockage externe dans le Cloud à une entité générée par Snowflake (c’est-à-dire un compte de service Cloud Storage). Pour accéder aux compartiments Cloud Storage, Snowflake crée un compte de service auquel des autorisations peuvent être accordées pour accéder aux compartiments dans lesquels vos fichiers de données sont stockés.

Une seule intégration de stockage peut prendre en charge plusieurs zones de préparation externes (c.-à-d. GCS). L’URL dans la définition de zone de préparation doit correspondre aux compartiments GCS (et aux chemins facultatifs) spécifiés pour le paramètre STORAGE_ALLOWED_LOCATIONS.

Note

Seuls les administrateurs de compte (utilisateurs dotés du rôle ACCOUNTADMIN) ou un rôle disposant du privilège global CREATE INTEGRATION peuvent exécuter cette commande SQL.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
Copy

Où :

  • integration_name est le nom de la nouvelle intégration.

  • bucket est le nom d’un compartiment Cloud Storage qui stocke vos fichiers de données (par exemple, mybucket). Les paramètres STORAGE_ALLOWED_LOCATIONS requis et STORAGE_BLOCKED_LOCATIONS facultatif limitent ou bloquent l’accès à ces compartiments, respectivement, lors de la création ou de la modification de zones de préparation faisant référence à cette intégration.

  • path est un chemin facultatif qui peut être utilisé pour fournir un contrôle granulaire sur les objets du compartiment.

L’exemple suivant crée une intégration qui limite explicitement les zones de préparation externes utilisant l’intégration pour faire référence à l’un des deux compartiments et des chemins. Dans une étape ultérieure, nous allons créer une zone de préparation externe qui fait référence à l’un de ces compartiments et chemins.

Les zones de préparation externes supplémentaires qui utilisent également cette intégration peuvent faire référence aux compartiments et aux chemins autorisés :

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
Copy

Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake

Exécutez la commande DESCRIBE INTEGRATION pour extraire l’ID du compte de service Cloud Storage créé automatiquement pour votre compte Snowflake :

DESC STORAGE INTEGRATION <integration_name>;
Copy

Où :

Par exemple :

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
Copy

La propriété STORAGE_GCP_SERVICE_ACCOUNT de la sortie affiche le compte de service Cloud Storage créé pour votre compte Snowflake (par exemple, service-account-id@project1-123456.iam.gserviceaccount.com). Nous fournissons un seul compte de service Cloud Storage pour l’ensemble de votre compte Snowflake. Toutes les intégrations Cloud Storage utilisent ce compte de service.

Étape 3 : Octroi d’autorisations au compte de service pour accéder à des objets de compartiment

Les instructions étape par étape suivantes décrivent comment configurer les permissions d’accès IAM pour Snowflake dans votre console Google Cloud Platform de sorte que vous puissiez utiliser un compartiment Cloud Storage pour charger et décharger les données :

Création d’un rôle IAM personnalisé

Créez un rôle personnalisé disposant des autorisations requises pour accéder au compartiment et obtenir des objets.

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez IAM & admin » Roles.

  3. Cliquez sur Create Role.

  4. Entrez un nom et une description pour le rôle personnalisé.

  5. Cliquez sur Add Permissions.

  6. Filtrez la liste des autorisations et ajoutez les éléments suivants dans la liste :

    Action(s)

    Autorisations requises

    Chargement des données uniquement

    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    Chargement de données avec option de purge, exécution de la commande REMOVE sur la zone de préparation

    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Chargement et déchargement des données

    • storage.buckets.get (pour le calcul des coûts de transfert des données)

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Déchargement des données uniquement

    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.list

  7. Cliquez sur Create.

Attribution du rôle personnalisé au compte du service Cloud Storage

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Cloud Storage » Browser :

    Bucket List in Google Cloud Platform Console
  3. Sélectionnez un compartiment à configurer pour l’accès.

  4. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information du compartiment s’affiche en coulissant.

  5. Cliquez sur le bouton ADD PRINCIPAL.

  6. Dans le champ New principals, recherchez le nom du compte de service à partir de la sortie DESCRIBE INTEGRATION dans Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake (dans cette rubrique).

    Bucket Information Panel in Google Cloud Platform Console
  7. Dans la liste déroulante Select a role , sélectionnez Custom » <rôle>, où <rôle> est le rôle Cloud Storage personnalisé que vous avez créé dans Création d’un rôle IAM personnalisé (dans ce chapitre).

  8. Cliquez sur le bouton Save. Le nom du compte de service est ajouté à la liste déroulante des rôles Storage Object Viewer dans le panneau d’informations.

    Storage Object Viewer role list in Google Cloud Platform Console

Octroi des autorisations de compte du service Cloud Storage sur les clés cryptographiques de Cloud Key Management Service

Note

Cette étape n’est requise que si votre compartiment GCS est chiffré à l’aide d’une clé stockée dans Google Cloud Key Management Service (Cloud KMS).

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Security » Cryptographic keys.

  3. Sélectionnez le porte-clés affecté à votre compartiment GCS.

  4. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information du porte-clés s’affiche.

  5. Cliquez sur le bouton ADD PRINCIPAL.

  6. Dans le champ New principals, recherchez le nom du compte de service à partir de la sortie DESCRIBE INTEGRATION dans Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake (dans cette rubrique).

  7. Dans la liste déroulante Select a role, sélectionnez le rôle Cloud KMS CrytoKey Encryptor/Decryptor.

  8. Cliquez sur le bouton Save. Le nom du compte de service est ajouté à la liste déroulante des rôles Cloud KMS CrytoKey Encryptor/Decryptor dans le panneau d’informations.

Configuration de l’automatisation à l’aide de Pub/Sub GCS

Conditions préalables

Les instructions de cette rubrique supposent que les éléments suivants ont été créés et configurés :

Compte GCP
  • Sujet Pub/Sub qui reçoit des messages d’événement depuis le compartiment GCS. Pour plus d’informations, voir Création du sujet Pub/Sub (dans cette rubrique).

  • Abonnement qui reçoit les messages d’événement du sujet Pub/Sub. Pour plus d’informations, voir Création de l’abonnement Pub/Sub (dans cette rubrique).

Pour obtenir des instructions, voir Documentation Pub/Sub.

Snowflake
  • Table cible dans la base de données Snowflake où vous souhaitez charger les données.

Création du sujet Pub/Sub

Créez un sujet Pub/Sub à l’aide de Cloud Shell ou du SDK Cloud.

Exécutez la commande suivante pour créer le sujet et lui permettre d’écouter l’activité dans le compartiment GCS spécifié :

$ gsutil notification create -t <topic> -f json gs://<bucket-name> -e OBJECT_FINALIZE
Copy

Où :

  • <sujet> est le nom du sujet.

  • <nom-compartiment> est le nom de votre compartiment GCS.

Si le sujet existe déjà, la commande l’utilise ; sinon, la commande crée un nouveau sujet.

Pour plus d’informations, voir la page Utiliser les notifications Pub/Sub pour le stockage Cloud dans la documentation Pub/Sub.

Création de l’abonnement Pub/Sub

Créez un abonnement au sujet avec transmission de type pull Pub/Sub à l’aide de la console Cloud, de l’outil de ligne de commande gcloud ou de l’API Cloud Pub/Sub. Pour obtenir des instructions, voir Gérer les sujets et les abonnements dans la documentation Pub/Sub.

Note

  • Seuls les abonnements Pub/Sub qui utilisent la transmission de type pull par défaut sont pris en charge par Snowflake. La transmission de type push n’est pas prise en charge.

Récupération de l’ID d’abonnement Pub/Sub

L’ID d’abonnement au sujet Pub/Sub est utilisé dans ces instructions pour permettre à Snowflake d’accéder aux messages d’événement.

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Big Data » Pub/Sub » Subscriptions.

  3. Copiez l’ID dans la colonne Subscription ID pour l’abonnement au sujet.

Étape 1 : créer une intégration de notification dans Snowflake

Créez une intégration de notification à l’aide de la commande CREATE NOTIFICATION INTEGRATION. L’intégration des notifications fait référence à votre abonnement Pub/Sub. Snowflake associe l’intégration de la notification à un compte de service GCS créé pour votre compte. Snowflake crée un seul compte de service référencé par toutes les notifications de stockage GCS de votre compte Snowflake.

Note

  • Seuls les administrateurs de compte (utilisateurs dotés du rôle ACCOUNTADMIN) ou un rôle disposant du privilège global CREATE INTEGRATION peuvent exécuter cette commande SQL.

  • Le compte de service GCS pour les intégrations de notification est différent du compte de service créé pour les intégrations de stockage.

  • Une seule intégration de notification prend en charge un seul abonnement Google Cloud Pub/Sub. Le fait de référencer le même abonnement Pub/Sub dans plusieurs intégrations de notification peut entraîner des données manquantes dans les tables cibles, car les notifications d’événements sont réparties entre les intégrations de notification.

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
Copy

Où :

Par exemple :

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
Copy

Étape 2 : Accorder un accès à Snowflake à l’abonnement Pub/Sub

  1. Exécutez la commande DESCRIBE INTEGRATION pour récupérer l’ID de compte de service Snowflake :

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    Où :

    Par exemple :

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. Enregistrez le nom du compte de service dans la colonne GCP_PUBSUB_SERVICE_ACCOUNT, qui a le format suivant :

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  4. Dans le tableau de bord d’accueil, sélectionnez Big Data » Pub/Sub » Subscriptions.

  5. Sélectionnez l’abonnement à configurer pour l’accès.

  6. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information de l’abonnement s’affiche en coulissant.

  7. Cliquez sur le bouton ADD PRINCIPAL.

  8. Dans le champ New principals, recherchez le nom du compte de service que vous avez enregistré.

  9. Dans la liste déroulante Select a role, sélectionnez Pub/Sub Subscriber.

  10. Cliquez sur le bouton Save. Le nom du compte de service est ajouté à la liste déroulante des rôles Pub/Sub Subscriber dans le panneau d’informations.

  11. Accédez à la page Dashboard de la console Cloud et sélectionnez votre projet dans la liste déroulante.

  12. Cliquez sur le bouton ADD PEOPLE TO THIS PROJECT .

  13. Ajoutez le nom du compte de service que vous avez enregistré.

  14. Dans la liste déroulante Select a role, sélectionnez Monitoring Viewer.

  15. Cliquez sur le bouton Save . Le nom du compte de service est ajouté au rôle Monitoring Viewer.

Étape 3 : créer une zone de préparation (si nécessaire)

Créez une zone de préparation externe qui fait référence à votre compartiment GCS à l’aide de la commande CREATE STAGE. Snowflake lit vos fichiers de données en zone de préparation dans les métadonnées de la table externe. Vous pouvez aussi utiliser une zone de préparation externe.

Note

  • Pour configurer un accès sécurisé à l’emplacement de stockage Cloud, voir Configuration de l’accès sécurisé au stockage Cloud (dans cette rubrique).

  • Pour faire référence à une intégration de stockage dans l’instruction CREATE STAGE, le rôle doit avoir le privilège USAGE sur l’objet d’intégration de stockage.

L’exemple suivant crée une zone de préparation nommée mystage dans le schéma actif de la session utilisateur. L’URL de stockage Cloud inclut le chemin files. La zone de préparation fait référence à une intégration de stockage nommée my_storage_int.

USE SCHEMA mydb.public;

CREATE STAGE mystage
  URL='gcs://load/files/'
  STORAGE_INTEGRATION = my_storage_int;
Copy

Étape 4 : Créer un canal avec l’intégration automatique activée

Créez un canal à l’aide de la commande CREATE PIPE . Le canal définit l’instruction COPY INTO <table> utilisée par Snowpipe pour charger les données de la file d’attente d’acquisition dans la table cible.

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<notification_integration_name>'
  AS
<copy_statement>;
Copy

Où :

<nom_canal>

Identificateur du canal ; doit être unique pour le schéma dans lequel le canal est créé.

L’identificateur doit commencer par un caractère alphabétique et ne peut pas contenir d’espaces ou de caractères spéciaux à moins que toute la chaîne d’identificateur soit délimitée par des guillemets doubles (p. ex. "My object"). Les identificateurs entre guillemets doubles sont également sensibles à la casse.

INTEGRATION = '<nom_intégration_notification>'

Nom de l’intégration de notification utilisée pour actualiser automatiquement les métadonnées de la table de répertoire à l’aide des notifications Google Cloud Pub/Sub. Une intégration de notification est un objet Snowflake qui fournit une interface entre Snowflake et des services tiers de mise en file d’attente de messages dans le Cloud.

copy_statement

Instruction COPY INTO <table> utilisée pour charger des données à partir de fichiers en file d’attente dans une table Snowflake. Cette instruction sert de texte/définition pour le canal, et elle est affichée dans la sortie SHOW PIPES.

Par exemple, créez un canal dans le schéma snowpipe_db.public qui charge les données des fichiers mis dans une zone de préparation externe (GCS) nommée mystage dans une table de destination nommée mytable :

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MY_NOTIFICATION_INT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;
Copy

Le paramètre INTEGRATION fait référence à my_notification_int l’intégration de notification créée à l” étape 1 : création d’une intégration Cloud Storage dans Snowflake. Le nom de l’intégration doit être fourni en majuscule.

Important

Vérifiez que la référence de l’emplacement de stockage dans l’instruction COPY INTO <table> ne chevauche pas la référence dans les canaux existants du compte. Sinon, plusieurs canaux pourraient charger le même ensemble de fichiers de données dans les tables cibles. Par exemple, cette situation peut se produire lorsque plusieurs définitions de canaux font référence au même emplacement de stockage avec différents niveaux de granularité, tels que <emplacement_stockage>/path1/ et <emplacement_stockage>/path1/path2/. Dans cet exemple, si les fichiers sont en zone de préparation dans <emplacement_stockage>/path1/path2/, les deux canaux chargeront une copie des fichiers.

Visualisez les instructions COPY INTO <table> dans les définitions de tous les canaux du compte en exécutant SHOW PIPES ou en interrogeant la vue PIPES dans Account Usage ou la vue PIPES dans Information Schema.

Snowpipe avec l’intégration automatique est maintenant configuré !

Lorsque de nouveaux fichiers de données sont ajoutés au compartiment GCS, le message d’événement indique à Snowpipe de les charger dans la table cible définie dans le canal.

Étape 5 : Charger des fichiers historiques

Pour charger les retards de traitement des fichiers de données qui existaient dans la zone de préparation externe avant que les messages Pub/Sub aient été configurés, exécutez une instruction ALTER PIPE … REFRESH.

Étape 6 : Supprimer les fichiers en zone de préparation

Supprimez les fichiers en zone de préparation après avoir chargé avec succès les données et lorsque vous n’avez plus besoin des fichiers. Pour obtenir des instructions, voir Suppression des fichiers en zone de préparation après le chargement des données par Snowpipe.

SYSTEM$PIPE_STATUS Sortie

La fonction SYSTEM$PIPE_STATUS récupère une représentation JSON du statut actuel d’un canal.

Pour les canaux avec AUTO_INGEST défini sur TRUE, la fonction renvoie un objet JSON contenant les paires nom/valeur suivantes (si applicable au statut actuel du canal) :

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
Copy

Pour la description des valeurs de sortie, consultez la rubrique de référence de la fonction SQL.