Option 2: Automatisieren von Snowpipe mit AWS Lambda

AWS Lambda ist ein Computeservice, der bei Auslösung durch ein Ereignis ausgeführt wird und Code ausführt, der in das System geladen wurde. Sie können den unter diesem Thema bereitgestellten Python-Beispielcode anpassen und eine Lambda-Funktion erstellen, die die Snowpipe-REST-API aufruft, um Daten aus Ihrem externen (d. h. S3-Bucket; Azure-Container werden nicht unterstützt) Stagingbereich zu laden. Die Funktion wird für Ihr AWS-Konto bereitgestellt, wo es gehostet wird. Ereignisse, die Sie in Lambda definieren (z. B. wenn Dateien in Ihrem S3-Bucket aktualisiert werden), rufen die Lambda-Funktion auf und führen den Python-Code aus.

Unter diesem Thema werden die Schritte beschrieben, die erforderlich sind, um eine Lambda-Funktion zu konfigurieren, die Daten automatisch kontinuierlich in Microbatches über Snowpipe lädt.

Bemerkung

Unter diesem Thema wird davon ausgegangen, dass Sie Snowpipe gemäß der Anleitung in Vorbereiten des Ladens von Daten über die Snowpipe REST-API konfiguriert haben.

Unter diesem Thema:

Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API schreiben

Beispiel für Python-Code

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

import os
import sys
import uuid

# Assume the public key has been registered in Snowflake
# private key in PEM format
private_key="""
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
"""

# Proxy object that abstracts the Snowpipe REST API
ingest_manager = SimpleIngestManager(account='<account_name>',
                   host='<account_name>.<region_id>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)

Bemerkung

Im angegebenen Beispielcode wird keine Fehlerbehandlung berücksichtigt. So wird beispielsweise nicht versucht, fehlgeschlagene ingest_manager-Aufrufe erneut zu starten.

Nehmen Sie vor dem Verwenden des Beispielcodes die folgenden Änderungen vor:

  1. Aktualisieren Sie den Sicherheitsparameter:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Gibt den lokalen Pfad zur Datei des privaten Schlüssels an, die Sie in Verwenden der Schlüsselpaar-Authentifizierung (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

  2. Aktualisieren Sie die Sitzungsparameter:

    account='<Kontoname>'

    Gibt den Namen Ihres Kontos an (bereitgestellt von Snowflake).

    host='<Kontoname>.<Regions-ID>.snowflakecomputing.com'

    Gibt die Hostinformationen in Form einer URL an. Beachten Sie, dass das Format der URL je nach Region, in der sich Ihr Konto befindet, unterschiedlich ist:

    US West

    <Kontoname>.snowflakecomputing.com

    Andere Regionen

    <Kontoname>.<Regions-ID>.snowflakecomputing.com

    Dabei ist <Kontoname> der Name Ihres Kontos (wird von Snowflake bereitgestellt), und <Regions-ID> ist:

    Region

    Regions-ID

    Anmerkungen

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Nur erforderlich, wenn Sie AWS PrivateLink für Konten in US West konfigurieren.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Nur für Business Critical-Konten (oder höher) verfügbar. Befindet sich nicht in AWS GovCloud (US), einer separaten, dedizierten Cloud, die von Snowflake noch nicht unterstützt wird.

    Canada (Central)

    ca-central-1.aws

    EU (Irland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokio)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapur)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Niederlande)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Nur für Business Critical-Konten (oder höher) verfügbar.

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Niederlande)

    west-europe.azure

    Switzerland North (Zürich)

    switzerland-north.azure

    Southeast Asia (Singapur)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user='<Benutzeranmeldename>'

    Gibt den Anmeldenamen des Snowflake-Benutzers an, der den Snowpipe-Code ausführen soll.

    pipe='<Datenbankname>.<Schemaname>.<Pipename>'

    Gibt den vollständig qualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll, und zwar in der Form <Datenbankname>.<Schemaname>.<Pipename>.

  3. Geben Sie in der Liste der Dateiobjekte den Pfad zu Ihren zu importierenden Dateien an:

    staged_file_list = []

    Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

  4. Speichern Sie die Datei an einem geeigneten Speicherort.

Bei den übrigen Anweisungen unter diesem Thema wird davon ausgegangen, dass der Dateiname SnowpipeLamdbaCode.py ist.

Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen

Führen Sie die folgenden Schritte aus, um eine Python-Laufzeitumgebung für Lambda zu erstellen, und fügen Sie den Snowpipe-Code hinzu, den Sie unter Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API (unter diesem Thema) angepasst haben. Weitere Informationen zu diesen Schritten finden Sie in der Dokumentation zum AWS-Lambda-Bereitstellungspaket (siehe Anweisungen zu Python).

Wichtig

Die Skripte in den folgenden Schritten sind ein repräsentatives Beispiel und setzen voraus, dass Sie eine AWS EC2-Linux-Instanz erstellen, die auf einer Amazon Machine Instance (AMI) basiert und den YUM-Paketmanager verwendet, der von RPM abhängt. Wenn Sie ein Debian-basiertes Linux AMI auswählen, müssen Sie Ihre Skripte entsprechend aktualisieren.

  1. Erstellen Sie eine AWS EC2-Linux-Instanz, indem Sie die AWS EC2-Anweisungen ausführen. Diese Instanz stellt die Computeressourcen für die Ausführung des Snowpipe-Codes zur Verfügung.

  2. Kopieren Sie die Snowpipe-Code-Datei mit SCP (Secure Copy) in Ihre neue AWS EC2-Instanz:

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    

    Wobei:

    • <Pfad> ist der Pfad zu Ihrer lokalen SnowpipeLambdaCode.py-Datei.

    • <Computer>.<Regions-ID> ist der DNS-Name der EC2-Instanz (z. B. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      Der DNS-Name wird auf dem Instances-Bildschirm der Amazon EC2-Konsole angezeigt.

  3. Verbinden Sie sich mit der EC2-Instanz per SSH (Secure SHell):

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
  4. Installieren Sie Python und verwandte Bibliotheken in der EC2-Instanz:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
  5. Erstellen Sie das ZIP-Bereitstellungspaket (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    

Schritt 3: AWS IAM-Rolle für Lambda erstellen

Folgen Sie der AWS-Lambda-Dokumentation, um eine IAM-Rolle zum Ausführen der Lambda-Funktion zu erstellen.

Notieren Sie den IAM Amazon Resource Name (ARN) für die Rolle. Sie benötigen ihn im nächsten Schritt.

Schritt 4: Lambda-Funktion erstellen

Erstellen Sie die Lambda-Funktion, indem Sie das .zip-Bereitstellungspaket hochladen, das Sie in Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen (unter diesem Thema) erstellt haben:

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

Geben Sie für --role den Rollen-ARN an, den Sie sich in Schritt 3: AWS IAM-Rolle für Lambda erstellen (unter diesem Thema) notiert haben.

Notieren Sie sich den in der Ausgabe angegebenen ARN der neuen Funktion. Sie benötigen ihn im nächsten Schritt.

Schritt 5: Aufrufe der Lambda-Funktion zulassen

Erteilen Sie S3 die erforderlichen Berechtigungen zum Aufruf Ihrer Funktion.

Geben Sie für --source-arn den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

Schritt 6: Lambda-Benachrichtigungsereignis registrieren

Registrieren Sie ein Lambda-Benachrichtigungsereignis, indem Sie die Anweisungen für Amazon S3-Ereignisbenachrichtigungen ausführen. Geben Sie im Eingabefeld den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.