Option 2 : Automatiser Snowpipe avec AWS Lambda

AWS Lambda est un service de calcul qui s’exécute lorsqu’il est déclenché par un événement et exécute le code qui a été chargé dans le système. Vous pouvez adapter l’exemple de code Python fourni dans ce chapitre et créer une fonction Lambda qui appelle l’API REST de Snowpipe pour charger les données depuis votre zone de préparation externe (c’est-à-dire compartiment S3 ; les conteneurs Azure ne sont pas pris en charge). La fonction est déployée sur votre compte AWS, où elle est hébergée. Les événements que vous définissez dans Lambda (par exemple, lorsque les fichiers de votre compartiment S3 sont mis à jour), appellent la fonction Lambda et exécutent le code Python.

Ce chapitre décrit les étapes nécessaires pour configurer une fonction Lambda afin de charger automatiquement des données dans des micro-lots en utilisant Snowpipe.

Note

Ce chapitre suppose que vous avez configuré Snowpipe en suivant les instructions de la section Préparation du chargement des données à l’aide de l’API REST Snowpipe.

Dans ce chapitre :

Étape 1 : Écrire du code Python invoquant l’API REST Snowpipe

Exemple de code Python

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

import os
import sys
import uuid

# Assume the public key has been registered in Snowflake
# private key in PEM format
private_key="""
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
"""

# Proxy object that abstracts the Snowpipe REST API
ingest_manager = SimpleIngestManager(account='<account_name>',
                   host='<account_name>.<region_id>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)

Note

L’exemple de code fourni dans ce chapitre ne tient pas compte du traitement des erreurs. Par exemple, il ne relance pas les appels ingest_manager qui ont échoué.

Avant d’utiliser l’exemple de code, apportez les modifications suivantes :

  1. Mettez à jour le paramètre de sécurité :

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Spécifie le contenu du fichier de clé privée que vous avez créé dans Utilisation de l’authentification par paire de clés (dans Préparation du chargement des données à l’aide de l’API REST Snowpipe).

  2. Mettez à jour les paramètres de session :

    account='<nom_compte>'

    Spécifie le nom de votre compte (fourni par Snowflake).

    host='<nom_compte>.<id_région>.snowflakecomputing.com'

    Spécifie vos informations d’hôte sous la forme d’une URL. Notez que le format de l’URL est différent selon la région de l’emplacement de votre compte :

    US Ouest

    <nom_compte>.snowflakecomputing.com

    Autres régions

    <nom_compte>.<id_région>.snowflakecomputing.com

    <nom_compte> est le nom de votre compte (fourni par Snowflake) et <id_région> est :

    Région

    ID de région

    Remarques

    Amazon Web Services (AWS)

    US Ouest (Oregon)

    us-west-2

    Requis uniquement lors de la configuration de AWS PrivateLink pour les comptes situés dans la région US Ouest.

    US Est (Ohio)

    us-east-2.aws

    US Est (Virginie du Nord)

    us-east-1

    US Est (Gouvernement commercial - Virginie du Nord)

    us-east-1-gov.aws

    Disponible uniquement pour les comptes sur Business Critical (ou supérieur) ; ne se trouve pas dans AWS GovCloud (US), qui est un Cloud dédié distinct pas encore pris en charge par Snowflake.

    Canada (Centre)

    ca-central-1.aws

    EU (Irlande)

    eu-west-1

    EU (Francfort)

    eu-central-1

    Asie-Pacifique (Tokyo)

    ap-northeast-1.aws

    Asie Pacifique (Mumbai)

    ap-south-1.aws

    Asie-Pacifique (Singapour)

    ap-southeast-1

    Asie-Pacifique (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe Ouest2 (Londres)

    europe-west2.gcp

    Europe Ouest4 (Pays-Bas)

    europe-west4.gcp

    Microsoft Azure

    Ouest US 2 (Washington)

    west-us-2.azure

    Est US 2 (Virginie)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Disponible uniquement pour les comptes sur Business Critical (ou version supérieure).

    Canada Central (Toronto)

    canada-central.azure

    Europe de l’Ouest (Pays-Bas)

    west-europe.azure

    Suisse Nord (Zurich)

    switzerland-north.azure

    Asie du Sud-Est (Singapour)

    southeast-asia.azure

    Australie Est (Nouvelle-Galles du Sud)

    australia-east.azure

    user='<nom_connexion_utilisateur>'

    Indique le nom de connexion de l’utilisateur Snowflake qui exécutera le code Snowpipe.

    pipe='<nom_bd>.<nom_schéma>.<nom_canal>'

    Spécifie le nom complet du canal à utiliser pour charger les données, sous la forme de <nom_bd>.<nom_schéma>.<nom_canal>.

  3. Indiquez le chemin de vos fichiers à importer dans la liste des objets de fichiers :

    staged_file_list = []

    Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

  4. Enregistrez le fichier dans un endroit pratique.

Les autres instructions de ce chapitre supposent que le nom du fichier est SnowpipeLamdbaCode.py.

Étape 2 : Créer un package de déploiement de fonctions Lambda

Suivez ces instructions pour créer un environnement d’exécution Python pour Lambda et ajoutez le code Snowpipe que vous avez adapté dans l”étape 1 : Écrire du code Python invoquant l’API REST Snowpipe (dans cette rubrique). Pour plus d’informations sur ces étapes, voir la documentation relative au package de déploiement AWS Lambda (voir les instructions pour Python).

Important

Les scripts des étapes suivantes sont un exemple représentatif et supposent que vous créez une instance Linux AWS EC2 basée sur une instance Amazon Machine Instance (AMI) qui utilise le gestionnaire de packages YUM, qui dépend de RPM. Si vous sélectionnez un AMI Linux basé sur Debian, veuillez mettre à jour vos scripts en conséquence.

  1. Créez une instance Linux EC2 AWS en suivant les instructions relatives à AWS EC2. Cette instance fournira les ressources de calcul permettant d’exécuter le code Snowpipe.

  2. Copiez le fichier de code Snowpipe dans votre nouvelle instance AWS EC2 en utilisant SCP (Copie sécurisée) :

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    

    Où :

    • <chemin> est le chemin de votre fichier SnowpipeLambdaCode.py local.

    • <machine>.<id_région> est le nom DNS de l’instance EC2 (par ex. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      Le nom DNS est affiché sur l’écran Instances de la console Amazon EC2.

  3. Connectez-vous à l’instance EC2 en utilisant SSH (SHell sécurisé) :

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
  4. Installez Python et les bibliothèques associées sur l’instance EC2 :

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
  5. Créez le package de déploiement .zip (Snowpipe.zip) :

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    

Étape 3 : Créer un rôle IAM AWS pour Lambda

Suivez la documentation relative à AWS Lambda pour créer un rôle IAM en vue d’exécuter la fonction Lambda.

Enregistrez le nom de la ressource IAM Amazon Resource Name (ARN) pour le rôle. Vous l’utiliserez à l’étape suivante.

Étape 4 : Créer la fonction Lambda

Créez la fonction Lambda en chargeant le package de déploiement .zip que vous avez créé dans l” étape 2 : Créer un package de déploiement de fonctions Lambda (dans cette rubrique) :

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

Pour --role, spécifiez le rôle ARN que vous avez enregistré à Étape 3 : Créer un rôle AWS IAM pour Lambda (dans cette rubrique).

Enregistrez l’ARN pour la nouvelle fonction à partir de la sortie. Vous l’utiliserez à l’étape suivante.

Étape 5 : Autoriser les appels vers la fonction Lambda

Accordez à S3 les autorisations nécessaires pour appeler votre fonction.

Pour --source-arn, spécifiez la fonction ARN que vous avez enregistrée dans l”étape 4 : Créer la fonction Lambda (ce chapitre).

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

Étape 6 : Enregistrer l’événement de notification Lambda

Enregistrez un événement de notification Lambda en complétant les instructions relatives aux notifications d’événements Amazon S3. Dans le champ de saisie, spécifiez la fonction ARN que vous avez enregistrée dans l”étape 4 : Créer la fonction Lambda (ce chapitre).