Option 2 : Automatiser Snowpipe avec AWS Lambda

AWS Lambda est un service de calcul qui s’exécute lorsqu’il est déclenché par un événement et exécute le code qui a été chargé dans le système. Vous pouvez adapter l’exemple de code Python fourni dans ce chapitre et créer une fonction Lambda qui appelle l’API REST de Snowpipe pour charger les données depuis votre zone de préparation externe (c’est-à-dire compartiment S3 ; les conteneurs Azure ne sont pas pris en charge). La fonction est déployée sur votre compte AWS, où elle est hébergée. Les événements que vous définissez dans Lambda (par exemple, lorsque les fichiers de votre compartiment S3 sont mis à jour), appellent la fonction Lambda et exécutent le code Python.

Ce chapitre décrit les étapes nécessaires pour configurer une fonction Lambda afin de charger automatiquement des données dans des micro-lots en utilisant Snowpipe.

Note

Ce chapitre suppose que vous avez configuré Snowpipe en suivant les instructions de la section Préparation au chargement de données à l’aide de l’API REST Snowpipe.

Dans ce chapitre :

Étape 1 : Rédaction du code Python invoquant l’API REST Snowpipe

Exemple de code Python

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

Note

L’exemple de code fourni dans ce chapitre ne tient pas compte du traitement des erreurs. Par exemple, il ne relance pas les appels ingest_manager qui ont échoué.

Avant d’utiliser l’exemple de code, apportez les modifications suivantes :

  1. Mettez à jour le paramètre de sécurité :

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Spécifie le contenu du fichier de clé privée que vous avez créé dans Utilisation de l’authentification par paires de clés et rotation des clés (dans Préparation au chargement de données à l’aide de l’API REST Snowpipe).

    Spécifiez la phrase secrète pour déchiffrer le fichier de clé privée à l’aide de la variable d’environnement PRIVATE_KEY_PASSPHRASE :

    • Linux ou macOS :

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows :

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. Mettez à jour les paramètres de session :

    account='<identificateur_de_compte>'

    Indiquez l’identificateur unique de votre compte (fourni par Snowflake). Voir la description de host.

    host='<identificateur_de_compte>.snowflakecomputing.com'

    Indiquez le nom d’hôte unique de votre compte Snowflake.

    Le format préféré de l’identificateur du compte est le suivant :

    organization_name-account_name

    Noms de votre organisation et de votre compte Snowflake. Pour plus de détails, voir Format 1 (recommandé) : nom du compte dans votre organisation..

    Vous pouvez également indiquer votre localisateur de compte, ainsi que la région et la plate-forme Cloud où le compte est hébergé, si nécessaire. Pour plus de détails, voir Format 2 (existant) : localisateur de compte dans une région.

    user='<nom_connexion_utilisateur>'

    Indique le nom de connexion de l’utilisateur Snowflake qui exécutera le code Snowpipe.

    pipe='<nom_bd>.<nom_schéma>.<nom_canal>'

    Spécifie le nom complet du canal à utiliser pour charger les données, sous la forme de <nom_bd>.<nom_schéma>.<nom_canal>.

  3. Indiquez le chemin de vos fichiers à importer dans la liste des objets de fichiers :

    staged_file_list = []

    Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

  4. Enregistrez le fichier dans un endroit pratique.

Les autres instructions de ce chapitre supposent que le nom du fichier est SnowpipeLamdbaCode.py.

Étape 2 : Création d’un paquet de déploiement de fonctions Lambda

Suivez ces instructions pour créer un environnement d’exécution Python pour Lambda et ajoutez le code Snowpipe que vous avez adapté dans l”étape 1 : Écrire du code Python invoquant l’API REST Snowpipe (dans cette rubrique). Pour plus d’informations sur ces étapes, voir la documentation relative au paquet de déploiement AWS Lambda (voir les instructions pour Python).

Important

Les scripts des étapes suivantes sont un exemple représentatif et supposent que vous créez une instance Linux AWS EC2 basée sur une instance Amazon Machine Instance (AMI) qui utilise le gestionnaire de paquets YUM, qui dépend de RPM. Si vous sélectionnez un AMI Linux basé sur Debian, veuillez mettre à jour vos scripts en conséquence.

  1. Créez une instance Linux EC2 AWS en suivant les instructions relatives à AWS EC2. Cette instance fournira les ressources de calcul permettant d’exécuter le code Snowpipe.

  2. Copiez le fichier de code Snowpipe dans votre nouvelle instance AWS EC2 en utilisant SCP (Copie sécurisée) :

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    Où :

    • <chemin> est le chemin de votre fichier SnowpipeLambdaCode.py local.

    • <machine>.<id_région> est le nom DNS de l’instance EC2 (par ex. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      Le nom DNS est affiché sur l’écran Instances de la console Amazon EC2.

  3. Connectez-vous à l’instance EC2 en utilisant SSH (SHell sécurisé) :

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. Installez Python et les bibliothèques associées sur l’instance EC2 :

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. Créez le paquet de déploiement .zip (Snowpipe.zip) :

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

Étape 3 : Création d’un rôle IAM AWS pour Lambda

Suivez la documentation relative à AWS Lambda pour créer un rôle IAM en vue d’exécuter la fonction Lambda.

Enregistrez le nom de la ressource IAM Amazon Resource Name (ARN) pour le rôle. Vous l’utiliserez à l’étape suivante.

Étape 4 : Création de la fonction Lambda

Créez la fonction Lambda en chargeant le paquet de déploiement .zip que vous avez créé dans l” étape 2 : Créer un paquet de déploiement de fonctions Lambda (dans cette rubrique) :

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

Pour --role, spécifiez le rôle ARN que vous avez enregistré à Étape 3 : Créer un rôle AWS IAM pour Lambda (dans cette rubrique).

Enregistrez l’ARN pour la nouvelle fonction à partir de la sortie. Vous l’utiliserez à l’étape suivante.

Étape 5 : Autorisation des appels vers la fonction Lambda

Accordez à S3 les autorisations nécessaires pour appeler votre fonction.

Pour --source-arn, spécifiez la fonction ARN que vous avez enregistrée dans l”étape 4 : Créer la fonction Lambda (ce chapitre).

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

Étape 6 : Enregistrement de l’événement de notification Lambda

Enregistrez un événement de notification Lambda en complétant les instructions relatives aux notifications d’événements Amazon S3. Dans le champ de saisie, spécifiez la fonction ARN que vous avez enregistrée dans l”étape 4 : Créer la fonction Lambda (ce chapitre).