Option 2: Automatisieren von Snowpipe mit AWS Lambda

AWS Lambda ist ein Computeservice, der bei Auslösung durch ein Ereignis ausgeführt wird und Code ausführt, der in das System geladen wurde. Sie können den unter diesem Thema bereitgestellten Python-Beispielcode anpassen und eine Lambda-Funktion erstellen, die die Snowpipe-REST-API aufruft, um Daten aus Ihrem externen (d. h. S3-Bucket; Azure-Container werden nicht unterstützt) Stagingbereich zu laden. Die Funktion wird für Ihr AWS-Konto bereitgestellt, wo es gehostet wird. Ereignisse, die Sie in Lambda definieren (z. B. wenn Dateien in Ihrem S3-Bucket aktualisiert werden), rufen die Lambda-Funktion auf und führen den Python-Code aus.

Unter diesem Thema werden die Schritte beschrieben, die erforderlich sind, um eine Lambda-Funktion zu konfigurieren, die Daten automatisch kontinuierlich in Microbatches über Snowpipe lädt.

Bemerkung

Unter diesem Thema wird davon ausgegangen, dass Sie Snowpipe gemäß der Anleitung in Vorbereiten des Ladens von Daten über die Snowpipe REST-API konfiguriert haben.

Unter diesem Thema:

Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API schreiben

Beispiel für Python-Code

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

Bemerkung

Im angegebenen Beispielcode wird keine Fehlerbehandlung berücksichtigt. So wird beispielsweise nicht versucht, fehlgeschlagene ingest_manager-Aufrufe erneut zu starten.

Nehmen Sie vor dem Verwenden des Beispielcodes die folgenden Änderungen vor:

  1. Aktualisieren Sie den Sicherheitsparameter:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Gibt den lokalen Pfad zur Datei des privaten Schlüssels an, die Sie in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

    Geben Sie die Passphrase für die Entschlüsselung der Datei des privaten Schlüssels mit der Umgebungsvariablen PRIVATE_KEY_PASSPHRASE an:

    • Linux oder macOS:

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows:

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. Aktualisieren Sie die Sitzungsparameter:

    account='<Kontobezeichner>'

    Geben Sie den eindeutigen Bezeichner für Ihr Konto an (wird von Snowflake bereitgestellt). Siehe auch die Beschreibung von host.

    host='<Kontobezeichner>.snowflakecomputing.com'

    Geben Sie den eindeutigen Hostnamen für Ihr Snowflake-Konto an.

    Das bevorzugte Format für den Kontobezeichner ist wie folgt:

    organization_name-account_name

    Namen Ihrer Snowflake-Organisation und Ihres Snowflake-Kontos. Weitere Details dazu finden Sie unter Format 1 (bevorzugt): Kontoname in Ihrer Organisation.

    Geben Sie alternativ Ihren Konto-Locator an, ggf. zusammen mit der Region und der Cloudplattform, auf der das Konto gehostet wird. Weitere Details dazu finden Sie unter Format 2 (älter): Konto-Locator in einer Region.

    user='<Benutzeranmeldename>'

    Gibt den Anmeldenamen des Snowflake-Benutzers an, der den Snowpipe-Code ausführen soll.

    pipe='<Datenbankname>.<Schemaname>.<Pipename>'

    Gibt den vollständig qualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll, und zwar in der Form <Datenbankname>.<Schemaname>.<Pipename>.

  3. Geben Sie in der Liste der Dateiobjekte den Pfad zu Ihren zu importierenden Dateien an:

    staged_file_list = []

    Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

  4. Speichern Sie die Datei an einem geeigneten Speicherort.

Bei den übrigen Anweisungen unter diesem Thema wird davon ausgegangen, dass der Dateiname SnowpipeLamdbaCode.py ist.

Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen

Führen Sie die folgenden Schritte aus, um eine Python-Laufzeitumgebung für Lambda zu erstellen, und fügen Sie den Snowpipe-Code hinzu, den Sie unter Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API (unter diesem Thema) angepasst haben. Weitere Informationen zu diesen Schritten finden Sie in der Dokumentation zum AWS-Lambda-Bereitstellungspaket (siehe Anweisungen zu Python).

Wichtig

Die Skripte in den folgenden Schritten sind ein repräsentatives Beispiel und setzen voraus, dass Sie eine AWS EC2-Linux-Instanz erstellen, die auf einer Amazon Machine Instance (AMI) basiert und den YUM-Paketmanager verwendet, der von RPM abhängt. Wenn Sie ein Debian-basiertes Linux AMI auswählen, müssen Sie Ihre Skripte entsprechend aktualisieren.

  1. Erstellen Sie eine AWS EC2-Linux-Instanz, indem Sie die AWS EC2-Anweisungen ausführen. Diese Instanz stellt die Computeressourcen für die Ausführung des Snowpipe-Codes zur Verfügung.

  2. Kopieren Sie die Snowpipe-Code-Datei mit SCP (Secure Copy) in Ihre neue AWS EC2-Instanz:

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    Wobei:

    • <Pfad> ist der Pfad zu Ihrer lokalen SnowpipeLambdaCode.py-Datei.

    • <Computer>.<Regions-ID> ist der DNS-Name der EC2-Instanz (z. B. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      Der DNS-Name wird auf dem Instances-Bildschirm der Amazon EC2-Konsole angezeigt.

  3. Verbinden Sie sich mit der EC2-Instanz per SSH (Secure SHell):

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. Installieren Sie Python und verwandte Bibliotheken in der EC2-Instanz:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. Erstellen Sie das ZIP-Bereitstellungspaket (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

Schritt 3: AWS IAM-Rolle für Lambda erstellen

Folgen Sie der AWS-Lambda-Dokumentation, um eine IAM-Rolle zum Ausführen der Lambda-Funktion zu erstellen.

Notieren Sie den IAM Amazon Resource Name (ARN) für die Rolle. Sie benötigen ihn im nächsten Schritt.

Schritt 4: Lambda-Funktion erstellen

Erstellen Sie die Lambda-Funktion, indem Sie das .zip-Bereitstellungspaket hochladen, das Sie in Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen (unter diesem Thema) erstellt haben:

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

Geben Sie für --role den Rollen-ARN an, den Sie sich in Schritt 3: AWS IAM-Rolle für Lambda erstellen (unter diesem Thema) notiert haben.

Notieren Sie sich den in der Ausgabe angegebenen ARN der neuen Funktion. Sie benötigen ihn im nächsten Schritt.

Schritt 5: Aufrufe der Lambda-Funktion zulassen

Erteilen Sie S3 die erforderlichen Berechtigungen zum Aufruf Ihrer Funktion.

Geben Sie für --source-arn den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

Schritt 6: Lambda-Benachrichtigungsereignis registrieren

Registrieren Sie ein Lambda-Benachrichtigungsereignis, indem Sie die Anweisungen für Amazon S3-Ereignisbenachrichtigungen ausführen. Geben Sie im Eingabefeld den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.