Option 2: Automatisieren von Snowpipe mit AWS Lambda¶
AWS Lambda ist ein Computeservice, der bei Auslösung durch ein Ereignis ausgeführt wird und Code ausführt, der in das System geladen wurde. Sie können den unter diesem Thema bereitgestellten Python-Beispielcode anpassen und eine Lambda-Funktion erstellen, die die Snowpipe-REST-API aufruft, um Daten aus Ihrem externen (d. h. S3-Bucket; Azure-Container werden nicht unterstützt) Stagingbereich zu laden. Die Funktion wird für Ihr AWS-Konto bereitgestellt, wo es gehostet wird. Ereignisse, die Sie in Lambda definieren (z. B. wenn Dateien in Ihrem S3-Bucket aktualisiert werden), rufen die Lambda-Funktion auf und führen den Python-Code aus.
Unter diesem Thema werden die Schritte beschrieben, die erforderlich sind, um eine Lambda-Funktion zu konfigurieren, die Daten automatisch kontinuierlich in Microbatches über Snowpipe lädt.
Bemerkung
Unter diesem Thema wird davon ausgegangen, dass Sie Snowpipe gemäß der Anleitung in Vorbereiten des Ladens von Daten über die Snowpipe-REST-API konfiguriert haben.
Unter diesem Thema:
Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API schreiben¶
Beispiel für Python-Code
from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend
import os
with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
Bemerkung
Im angegebenen Beispielcode wird keine Fehlerbehandlung berücksichtigt. So wird beispielsweise nicht versucht, fehlgeschlagene ingest_manager
-Aufrufe erneut zu starten.
Nehmen Sie vor dem Verwenden des Beispielcodes die folgenden Änderungen vor:
Aktualisieren Sie den Sicherheitsparameter:
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """
Gibt den lokalen Pfad zur Datei des privaten Schlüssels an, die Sie in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter Vorbereiten des Ladens von Daten über die Snowpipe-REST-API) erstellt haben.
Geben Sie die Passphrase für die Entschlüsselung der Datei des privaten Schlüssels mit der Umgebungsvariablen
PRIVATE_KEY_PASSPHRASE
an:Linux oder macOS:
export PRIVATE_KEY_PASSPHRASE='<passphrase>'
Windows:
set PRIVATE_KEY_PASSPHRASE='<passphrase>'
Aktualisieren Sie die Sitzungsparameter:
account='<Kontobezeichner>'
Geben Sie den eindeutigen Bezeichner für Ihr Konto an (wird von Snowflake bereitgestellt). Siehe auch die Beschreibung von
host
.host='<Kontobezeichner>.snowflakecomputing.com'
Geben Sie den eindeutigen Hostnamen für Ihr Snowflake-Konto an.
Das bevorzugte Format für den Kontobezeichner ist wie folgt:
organization_name-account_name
Namen Ihrer Snowflake-Organisation und Ihres Snowflake-Kontos. Weitere Details dazu finden Sie unter Format 1 (bevorzugt): Kontoname in Ihrer Organisation.
Geben Sie alternativ Ihren Konto-Locator an, ggf. zusammen mit der Region und der Cloudplattform, auf der das Konto gehostet wird. Weitere Details dazu finden Sie unter Format 2 (älter): Konto-Locator in einer Region.
user='<Benutzeranmeldename>'
Gibt den Anmeldenamen des Snowflake-Benutzers an, der den Snowpipe-Code ausführen soll.
pipe='<Datenbankname>.<Schemaname>.<Pipename>'
Gibt den vollständig qualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll, und zwar in der Form
<Datenbankname>.<Schemaname>.<Pipename>
.
Geben Sie in der Liste der Dateiobjekte den Pfad zu Ihren zu importierenden Dateien an:
staged_file_list = []
Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung
.csv.gz
haben.
Speichern Sie die Datei an einem geeigneten Speicherort.
Bei den übrigen Anweisungen unter diesem Thema wird davon ausgegangen, dass der Dateiname SnowpipeLamdbaCode.py
ist.
Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen¶
Führen Sie die folgenden Schritte aus, um eine Python-Laufzeitumgebung für Lambda zu erstellen, und fügen Sie den Snowpipe-Code hinzu, den Sie unter Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API (unter diesem Thema) angepasst haben. Weitere Informationen zu diesen Schritten finden Sie in der Dokumentation zum AWS-Lambda-Bereitstellungspaket (siehe Anweisungen zu Python).
Wichtig
Die Skripte in den folgenden Schritten sind ein repräsentatives Beispiel und setzen voraus, dass Sie eine AWS EC2-Linux-Instanz erstellen, die auf einer Amazon Machine Instance (AMI) basiert und den YUM-Paketmanager verwendet, der von RPM abhängt. Wenn Sie ein Debian-basiertes Linux AMI auswählen, müssen Sie Ihre Skripte entsprechend aktualisieren.
Erstellen Sie eine AWS EC2-Linux-Instanz, indem Sie die AWS EC2-Anweisungen ausführen. Diese Instanz stellt die Computeressourcen für die Ausführung des Snowpipe-Codes zur Verfügung.
Kopieren Sie die Snowpipe-Code-Datei mit SCP (Secure Copy) in Ihre neue AWS EC2-Instanz:
scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
Wobei:
<Pfad>
ist der Pfad zu Ihrer lokalenSnowpipeLambdaCode.py
-Datei.<Computer>.<Regions-ID>
ist der DNS-Name der EC2-Instanz (z. B.ec2-54-244-54-199.us-west-2.compute.amazonaws.com
).Der DNS-Name wird auf dem Instances-Bildschirm der Amazon EC2-Konsole angezeigt.
Verbinden Sie sich mit der EC2-Instanz per SSH (Secure SHell):
ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
Installieren Sie Python und verwandte Bibliotheken in der EC2-Instanz:
sudo yum install -y gcc zlib zlib-devel openssl openssl-devel wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz tar -xzvf Python-3.6.1.tgz cd Python-3.6.1 && ./configure && make sudo make install sudo /usr/local/bin/pip3 install virtualenv /usr/local/bin/virtualenv ~/shrink_venv source ~/shrink_venv/bin/activate pip install Pillow pip install boto3 pip install requests pip install snowflake-ingest
Erstellen Sie das ZIP-Bereitstellungspaket (
Snowpipe.zip
):cd $VIRTUAL_ENV/lib/python3.6/site-packages zip -r9 ~/Snowpipe.zip . cd ~ zip -g Snowpipe.zip SnowpipeLambdaCode.py
Schritt 3: AWS-IAM-Rolle für Lambda erstellen¶
Folgen Sie der AWS-Lambda-Dokumentation, um eine IAM-Rolle zum Ausführen der Lambda-Funktion zu erstellen.
Notieren Sie den IAM Amazon Resource Name (ARN) für die Rolle. Sie benötigen ihn im nächsten Schritt.
Schritt 4: Lambda-Funktion erstellen¶
Erstellen Sie die Lambda-Funktion, indem Sie das .zip
-Bereitstellungspaket hochladen, das Sie in Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen (unter diesem Thema) erstellt haben:
aws lambda create-function \ --region us-west-2 \ --function-name IngestFile \ --zip-file fileb://~/Snowpipe.zip \ --role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \ --handler SnowpipeLambdaCode.handler \ --runtime python3.6 \ --profile adminuser \ --timeout 10 \ --memory-size 1024
Geben Sie für --role
den Rollen-ARN an, den Sie sich in Schritt 3: AWS IAM-Rolle für Lambda erstellen (unter diesem Thema) notiert haben.
Notieren Sie sich den in der Ausgabe angegebenen ARN der neuen Funktion. Sie benötigen ihn im nächsten Schritt.
Schritt 5: Aufrufen der Lambda-Funktion zulassen¶
Erteilen Sie S3 die erforderlichen Berechtigungen zum Aufruf Ihrer Funktion.
Geben Sie für --source-arn
den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.
aws lambda add-permission \ --function-name IngestFile \ --region us-west-2 \ --statement-id enable-ingest-calls \ --action "lambda:InvokeFunction" \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<SourceBucket> \ --source-account <aws_account_id> \ --profile adminuser
Schritt 6: Lambda-Benachrichtigungsereignis registrieren¶
Registrieren Sie ein Lambda-Benachrichtigungsereignis, indem Sie die Anweisungen für Amazon S3-Ereignisbenachrichtigungen ausführen. Geben Sie im Eingabefeld den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.