Snowpipe automatisé pour Google Cloud Storage

Cette rubrique fournit des instructions pour déclencher automatiquement les chargements de données Snowpipe à l’aide de messages Google Cloud Pub/Sub pour les événements Google Cloud Storage (GCS).

Note

Cette fonctionnalité est limitée aux comptes Snowflake hébergés sur Amazon Web Services (AWS) ou Google Cloud Platform. Les instructions pour automatiser les chargements de données Snowpipe à l’aide des messages GCS Pub/Sub sont identiques pour les comptes sur l’une ou l’autre des plateformes d’hébergement Cloud.

La prise en charge de l’automatisation des chargements de données Snowpipe depuis GCS vers des comptes Snowflake hébergés sur AWS est fournie en tant que fonctionnalité d’avant-première.

Dans ce chapitre :

Configuration de l’accès sécurisé au stockage Cloud

Note

Si vous avez déjà configuré un accès sécurisé au compartiment GCS qui stocke vos fichiers de données, vous pouvez ignorer cette section.

Cette section décrit comment configurer un objet d’intégration de stockage Snowflake pour déléguer la responsabilité de l’authentification pour le stockage dans le Cloud à une entité Gestion des identités et des accès Snowflake (IAM).

Cette section explique comment utiliser les intégrations de stockage pour permettre à Snowflake de lire des données et de les écrire dans un compartiment de Google Cloud Storage référencé dans une zone de préparation externe (c’est-à-dire Cloud Storage). Les intégrations sont des objets Snowflake de première classe nommés, qui évitent de transmettre des informations d’identification explicites de fournisseur de Cloud, telles que des clés secrètes ou des jetons d’accès ; au lieu de cela, les objets d’intégration font référence à un compte de service Cloud Storage. Un administrateur de votre organisation accorde des autorisations au compte de service dans le compte Cloud Storage.

Les administrateurs peuvent également limiter les utilisateurs à un ensemble spécifique de compartiments Cloud Storage (et de chemins d’accès facultatifs) auxquels accèdent des zones de préparation externes utilisant l’intégration.

Note

Pour suivre les instructions de cette section, vous devez avoir accès à votre projet Cloud Storage en tant qu’éditeur de projet. Si vous n’êtes pas un éditeur de projet, demandez à votre administrateur Cloud Storage d’effectuer ces tâches.

Le diagramme suivant illustre le flux d’intégration d’une zone de préparation Cloud Storage :

Google Cloud Storage Stage Integration Flow
  1. Une zone de préparation externe (c.-à-d Cloud Storage) fait référence à un objet d’intégration de stockage dans sa définition.

  2. Snowflake associe automatiquement l’intégration de stockage à un compte de service Cloud Storage créé pour votre compte. Snowflake crée un seul compte de service référencé par toutes les intégrations de stockage GCS de votre compte Snowflake.

  3. Un éditeur de projet pour votre projet Cloud Storage accorde des autorisations au compte de service pour accéder au compartiment référencé dans la définition de la zone de stockage. Notez que de nombreux objets de zone de stockage externes peuvent référencer différents compartiments et chemins et utiliser la même intégration pour l’authentification.

Lorsqu’un utilisateur charge ou décharge des données depuis ou vers une zone de préparation, Snowflake vérifie les autorisations accordées au compte de service sur le compartiment avant d’autoriser ou de refuser l’accès.

Dans cette section :

Étape 1 : Création d’une intégration Cloud Storage dans Snowflake

Créez une intégration à l’aide de la commande CREATE STORAGE INTEGRATION. Une intégration est un objet Snowflake qui délègue la responsabilité de l’authentification pour un stockage externe dans le Cloud à une entité générée par Snowflake (c’est-à-dire un compte de service Cloud Storage). Pour accéder aux compartiments Cloud Storage, Snowflake crée un compte de service auquel des autorisations peuvent être accordées pour accéder aux compartiments dans lesquels vos fichiers de données sont stockés.

Une seule intégration de stockage peut prendre en charge plusieurs zones de préparation externes (c.-à-d. GCS). L’URL dans la définition de zone de préparation doit correspondre aux compartiments GCS (et aux chemins facultatifs) spécifiés pour le paramètre STORAGE_ALLOWED_LOCATIONS.

Note

Seuls les administrateurs de compte (utilisateurs dotés du rôle ACCOUNTADMIN) ou un rôle disposant du privilège global CREATE INTEGRATION peuvent exécuter cette commande SQL.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]

Où :

  • nom_intégration est le nom de la nouvelle intégration.

  • compartiment est le nom d’un compartiment Cloud Storage qui stocke vos fichiers de données (par exemple, mybucket). Les paramètres STORAGE_ALLOWED_LOCATIONS requis et STORAGE_BLOCKED_LOCATIONS facultatif limitent ou bloquent l’accès à ces compartiments, respectivement, lors de la création ou de la modification de zones de préparation faisant référence à cette intégration.

  • chemin est un chemin facultatif qui peut être utilisé pour fournir un contrôle granulaire sur les objets du compartiment.

L’exemple suivant crée une intégration qui limite explicitement les zones de préparation externes utilisant l’intégration pour faire référence à l’un des deux compartiments et des chemins. Dans une étape ultérieure, nous allons créer une zone de préparation externe qui fait référence à l’un de ces compartiments et chemins.

Les zones de préparation externes supplémentaires qui utilisent également cette intégration peuvent faire référence aux compartiments et aux chemins autorisés :

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = GCS
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');

Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake

Exécutez la commande DESCRIBE INTEGRATION pour extraire l’ID du compte de service Cloud Storage créé automatiquement pour votre compte Snowflake :

DESC STORAGE INTEGRATION <integration_name>;

Où :

Par exemple :

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+

La propriété STORAGE_GCP_SERVICE_ACCOUNT de la sortie affiche le compte de service Cloud Storage créé pour votre compte Snowflake (par exemple, service-account-id@project1-123456.iam.gserviceaccount.com). Nous fournissons un seul compte de service Cloud Storage pour l’ensemble de votre compte Snowflake. Toutes les intégrations Cloud Storage utilisent ce compte de service.

Étape 3 : Octroi d’autorisations au compte de service pour accéder à des objets de compartiment

Les instructions étape par étape suivantes décrivent comment configurer les permissions d’accès IAM pour Snowflake dans votre console Google Cloud Platform de sorte que vous puissiez utiliser un compartiment Cloud Storage pour charger et décharger les données :

Création d’un rôle IAM personnalisé

Créez un rôle personnalisé disposant des autorisations requises pour accéder au compartiment et obtenir des objets.

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez IAM & admin » Roles.

  3. Cliquez sur Create Role.

  4. Entrez un nom et une description pour le rôle personnalisé.

  5. Cliquez sur Add Permissions.

  6. Filtrez la liste des autorisations et ajoutez les éléments suivants dans la liste :

    Chargement des données uniquement
    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    Chargement des données avec option de purge
    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Chargement et déchargement des données
    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

  7. Cliquez sur Create.

Attribution du rôle personnalisé au compte du service Cloud Storage

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Storage » Browser :

    Bucket List in Google Cloud Platform Console
  3. Sélectionnez un compartiment à configurer pour l’accès.

  4. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information du compartiment s’affiche en coulissant.

  5. Dans le champ Add members, recherchez le nom du compte de service à partir de la sortie DESCRIBE INTEGRATION dans Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake (dans cette rubrique).

    Bucket Information Panel in Google Cloud Platform Console
  6. Dans la liste déroulante Select a role , sélectionnez Storage » Custom » <rôle>, où <rôle> est le rôle Cloud Storage personnalisé que vous avez créé dans Création d’un rôle IAM personnalisé (dans ce chapitre).

  7. Cliquez sur le bouton Add. Le nom du compte de service est ajouté à la liste déroulante des rôles Storage Object Viewer dans le panneau d’informations.

    Storage Object Viewer role list in Google Cloud Platform Console

Octroi des autorisations de compte du service Cloud Storage sur les clés cryptographiques de Cloud Key Management Service

Note

Cette étape n’est requise que si votre compartiment GCS est chiffré à l’aide d’une clé stockée dans Google Cloud Key Management Service (Cloud KMS).

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Security » Cryptographic keys.

  3. Sélectionnez le porte-clés affecté à votre compartiment GCS.

  4. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information du porte-clés s’affiche.

  5. Dans le champ Add members, recherchez le nom du compte de service à partir de la sortie DESCRIBE INTEGRATION dans Étape 2 : Récupération du compte de service Cloud Storage pour votre compte Snowflake (dans cette rubrique).

  6. Dans la liste déroulante Select a role, sélectionnez le rôle Cloud KMS CrytoKey Encryptor/Decryptor.

  7. Cliquez sur le bouton Add. Le nom du compte de service est ajouté à la liste déroulante des rôles Cloud KMS CrytoKey Encryptor/Decryptor dans le panneau d’informations.

Configuration de Snowpipe automatisé à l’aide de Pub/Sub GCS

Conditions préalables

Les instructions de cette rubrique supposent que les éléments suivants ont été créés et configurés :

Compte GCP
  • Sujet Pub/Sub qui reçoit des messages d’événement depuis le compartiment GCS. Pour plus d’informations, voir Création du sujet Pub/Sub (dans cette rubrique).

  • Abonnement qui reçoit les messages d’événement du sujet Pub/Sub. Pour plus d’informations, voir Création de l’abonnement Pub/Sub (dans cette rubrique).

Pour obtenir des instructions, voir Documentation Pub/Sub.

Snowflake
  • Table cible dans la base de données Snowflake où vos données seront chargées.

Création du sujet Pub/Sub

Créez un sujet Pub/Sub à l’aide de Cloud Shell ou du SDK Cloud.

Exécutez la commande suivante pour créer le sujet et lui permettre d’écouter l’activité dans le compartiment GCS spécifié :

$ gsutil notification create -t <topic> -f json gs://<bucket-name>

Où :

  • <sujet> est le nom du sujet.

  • <nom-compartiment> est le nom de votre compartiment GCS.

Si le sujet existe déjà, la commande l’utilise ; sinon, un nouveau sujet est créé.

Pour plus d’informations, voir la page Utiliser les notifications Pub/Sub pour le stockage Cloud dans la documentation Pub/Sub.

Création de l’abonnement Pub/Sub

Créez un abonnement au sujet Pub/Sub à l’aide de la console Cloud, de l’outil de ligne de commande gcloud ou de l’API Cloud Pub/Sub. Pour obtenir des instructions, voir Gérer les sujets et les abonnements dans la documentation Pub/Sub.

Récupération de l’ID d’abonnement Pub/Sub

L’ID d’abonnement au sujet Pub/Sub est utilisé dans ces instructions pour permettre à Snowflake d’accéder aux messages d’événement.

  1. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  2. Dans le tableau de bord d’accueil, sélectionnez Big Data » Pub/Sub » Subscriptions.

  3. Copiez l’ID dans la colonne Subscription ID pour l’abonnement au sujet

Étape 1 : créer une intégration de notification dans Snowflake

Créez une intégration de notification à l’aide de la commande CREATE NOTIFICATION INTEGRATION. L’intégration des notifications fait référence à votre abonnement Pub/Sub. Snowflake associe l’intégration de la notification à un compte de service GCS créé pour votre compte. Snowflake crée un seul compte de service référencé par toutes les notifications de stockage GCS de votre compte Snowflake.

Note

  • Seuls les administrateurs de compte (utilisateurs dotés du rôle ACCOUNTADMIN) ou un rôle disposant du privilège global CREATE INTEGRATION peuvent exécuter cette commande SQL.

  • Le compte de service GCS pour les intégrations de notification est différent du compte de service créé pour les intégrations de stockage.

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';

Où :

Par exemple :

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';

Note

Actuellement, la commande ALTER NOTIFICATION INTEGRATION ne prend pas en charge la modification de la valeur du paramètre GCP_PUBSUB_SUBSCRIPTION_NAME. Si une valeur de paramètre incorrecte est spécifiée, vous devez recréer l’intégration de notification (à l’aide de CREATE OR REPLACE NOTIFICATION INTEGRATION).

Étape 2 : Accorder un accès à Snowflake à l’abonnement Pub/Sub

  1. Exécutez la commande DESCRIBE INTEGRATION pour récupérer l’ID de compte de service Snowflake :

    DESC NOTIFICATION INTEGRATION <integration_name>;
    

    Où :

    Par exemple :

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
  2. Enregistrez le nom du compte de service dans la colonne GCP_PUBSUB_SERVICE_ACCOUNT, qui a le format suivant :

    <service_account>@<project_id>.iam.gserviceaccount.com
    
  3. Connectez-vous à la console Google Cloud Platform en tant qu’éditeur de projet.

  4. Dans le tableau de bord d’accueil, sélectionnez Big Data » Pub/Sub » Subscriptions.

  5. Sélectionnez l’abonnement à configurer pour l’accès.

  6. Cliquez sur SHOW INFO PANEL dans le coin supérieur droit. Le panneau d’information de l’abonnement s’affiche en coulissant.

  7. Dans le champ Add members, recherchez le nom du compte de service que vous avez enregistré.

  8. Dans la liste déroulante Select a role, sélectionnez Pub/Sub Subscriber.

  9. Cliquez sur le bouton Add. Le nom du compte de service est ajouté à la liste déroulante des rôles Pub/Sub Subscriber dans le panneau d’informations.

Étape 3 : créer un canal avec l’intégration automatique activée

Créez un canal à l’aide de la commande CREATE PIPE. Le canal définit l’instruction COPY INTO <table> utilisée par Snowpipe pour charger les données de la file d’attente d’acquisition dans la table cible.

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<integration_name>'
  AS
<copy_statement>;

Où :

  • <nom_canal> est l’identificateur du canal ; doit être unique pour le schéma dans lequel le canal est créé.

    L’identificateur doit commencer par un caractère alphabétique et ne peut pas contenir d’espaces ou de caractères spéciaux à moins que toute la chaîne d’identificateur soit délimitée par des guillemets doubles (p. ex. "My object"). Les identificateurs entre guillemets doubles sont également sensibles à la casse.

  • <nom_intégration> est le nom de l’intégration de notification créée à l” étape 1 : Création d’une intégration Cloud Storage dans Snowflake.

  • copier_instruction est l’instruction COPY INTO <table> utilisée pour charger des données à partir de fichiers en file d’attente dans une table Snowflake. Cette instruction sert de texte/définition pour le canal, et elle est affichée dans la sortie SHOW PIPES.

Par exemple, créez un canal dans le schéma snowpipe_db.public qui charge les données des fichiers mis dans une zone de préparation externe (GCS) nommée mystage dans une table de destination nommée mytable :

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MYINT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;

Important

Comparez la référence de la zone de préparation dans la définition de canal avec les canaux existants. Vérifiez que les chemins de répertoire du même compartiment GCS ne se chevauchent pas. Sinon, plusieurs canaux pourraient charger le même ensemble de fichiers de données plusieurs fois dans une ou plusieurs tables cibles. Cela peut se produire, par exemple, lorsque plusieurs zones de préparation font référence au même compartiment GCS avec différents niveaux de granularité, tels que gcs://mybucket1/path1 et gcs://mybucket1/path1/path2. Dans ce cas d’utilisation, si les fichiers sont stockés dans gcs://mybucket1/path1/path2, les canaux pour les deux zones de préparation chargeraient une copie des fichiers.

Cela diffère de la configuration manuelle de Snowpipe (avec l’intégration automatique désactivée), qui oblige les utilisateurs à soumettre un ensemble de fichiers nommé à une API REST pour mettre en file d’attente les fichiers à charger. Lorsque l’intégration automatique est activée, chaque canal reçoit une liste de fichiers générée à partir des messages Pub/Sub. Des précautions supplémentaires sont nécessaires pour éviter la duplication des données.

Snowpipe avec l’intégration automatique est maintenant configuré !

Lorsque de nouveaux fichiers de données sont ajoutés au compartiment GCS, le message d’événement indique à Snowpipe de les charger dans la table cible définie dans le canal.

Étape 4 : Charger des fichiers historiques

Pour charger les retards de traitement des fichiers de données qui existaient dans la zone de préparation externe avant que les messages Pub/Sub aient été configurés, exécutez une instruction ALTER PIPE … REFRESH .

SYSTEM$PIPE_STATUS Sortie

La fonction SYSTEM$PIPE_STATUS récupère une représentation JSON du statut actuel d’un canal.

Pour les canaux avec AUTO_INGEST défini sur TRUE, la fonction renvoie un objet JSON contenant les paires nom/valeur suivantes (si applicable au statut actuel du canal) :

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}

Où :

executionState

État d’exécution actuel du canal ; pourrait être l’un des éléments suivants :

  • RUNNING (c’est-à-dire que tout est normal ; Snowflake peut ou non traiter activement les fichiers de ce canal)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Le plus ancien horodatage parmi les fichiers de données en file d’attente (le cas échéant), l’horodatage étant défini lors de l’ajout du fichier à la file d’attente.

pendingFileCount

Nombre de fichiers en cours de traitement par le canal. Si le canal est mis en pause, cette valeur diminue à mesure que tous les fichiers en file d’attente avant que le canal a été mis en pause sont traités. Lorsque cette valeur est 0, aucun fichier n’est en file d’attente pour ce canal ou le canal est effectivement mis en pause.

notificationChannelName

File d’attente de stockage GCS associée au canal.

numOutstandingMessagesOnChannel

Nombre de messages dans la file d’attente de stockage qui ont été mis en file d’attente mais pas encore reçus.

lastReceivedMessageTimestamp

Horodatage du dernier message reçu de la file d’attente de stockage. Notez que ce message peut ne pas s’appliquer au canal spécifique (par exemple si le chemin/préfixe associé au message ne correspond pas au chemin/préfixe de la définition du canal). De plus, seuls les messages déclenchés par des objets de données créés sont consommés par les canaux d’intégration automatique.

lastForwardedMessageTimestamp

Horodatage du dernier message d’événement « créer un objet » avec un chemin/préfixe correspondant qui a été transmis au canal.

error

Message d’erreur généré lors de la dernière compilation du canal en vue de son exécution (le cas échéant) ; souvent causé par des problèmes d’accès aux objets requis (par exemple, une table, une zone de préparation, le format de fichier) en raison de problèmes d’autorisation ou d’objets perdus.

fault

La plus récente erreur interne liée au processus Snowflake (le cas échéant). Utilisé principalement par Snowflake à des fins de débogage.