Option 2: Automatisieren von Snowpipe mit AWS Lambda¶
AWS Lambda ist ein Computeservice, der bei Auslösung durch ein Ereignis ausgeführt wird und Code ausführt, der in das System geladen wurde. Sie können den unter diesem Thema bereitgestellten Python-Beispielcode anpassen und eine Lambda-Funktion erstellen, die die Snowpipe-REST-API aufruft, um Daten aus Ihrem externen (d. h. S3-Bucket; Azure-Container werden nicht unterstützt) Stagingbereich zu laden. Die Funktion wird für Ihr AWS-Konto bereitgestellt, wo es gehostet wird. Ereignisse, die Sie in Lambda definieren (z. B. wenn Dateien in Ihrem S3-Bucket aktualisiert werden), rufen die Lambda-Funktion auf und führen den Python-Code aus.
Unter diesem Thema werden die Schritte beschrieben, die erforderlich sind, um eine Lambda-Funktion zu konfigurieren, die Daten automatisch kontinuierlich in Microbatches über Snowpipe lädt.
Bemerkung
Unter diesem Thema wird davon ausgegangen, dass Sie Snowpipe gemäß der Anleitung in Vorbereiten des Ladens von Daten über die Snowpipe REST-API konfiguriert haben.
Unter diesem Thema:
Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API schreiben¶
Beispiel für Python-Code
from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend
import os
import sys
import uuid
with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_name>',
host='<account_name>.<region_id>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
Bemerkung
Im angegebenen Beispielcode wird keine Fehlerbehandlung berücksichtigt. So wird beispielsweise nicht versucht, fehlgeschlagene ingest_manager
-Aufrufe erneut zu starten.
Nehmen Sie vor dem Verwenden des Beispielcodes die folgenden Änderungen vor:
Aktualisieren Sie den Sicherheitsparameter:
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """
Gibt den lokalen Pfad zur Datei des privaten Schlüssels an, die Sie in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.
Geben Sie die Passphrase für die Entschlüsselung der Datei des privaten Schlüssels mit der Umgebungsvariablen
PRIVATE_KEY_PASSPHRASE
an:Linux oder macOS:
export PRIVATE_KEY_PASSPHRASE='<passphrase>'
Windows:
set PRIVATE_KEY_PASSPHRASE='<passphrase>'
Aktualisieren Sie die Sitzungsparameter:
account='<Kontoname>'
Gibt den Namen Ihres Kontos an (bereitgestellt von Snowflake).
host='<Kontoname>.<Regions-ID>.snowflakecomputing.com'
Gibt die Hostinformationen in Form einer URL an. Beachten Sie, dass das Format der URL je nach Region, in der sich Ihr Konto befindet, unterschiedlich ist:
- US West
<Kontoname>.snowflakecomputing.com
- Andere Regionen
<Kontoname>.<Regions-ID>.snowflakecomputing.com
Dabei ist
<Kontoname>
der Name Ihres Kontos (wird von Snowflake bereitgestellt), und<Regions-ID>
ist:Region
Regions-ID
Anmerkungen
Amazon Web Services (AWS)
US West (Oregon)us-west-2
Nur erforderlich, wenn Sie AWS PrivateLink für Konten in US West konfigurieren.
US East (Ohio)us-east-2.aws
US East (N. Virginia)us-east-1
US East (Commercial Gov - N. Virginia)us-east-1-gov.aws
Nur für Business Critical-Konten (oder höher) verfügbar. Befindet sich nicht in AWS GovCloud (US), einer separaten, dedizierten Cloud, die von Snowflake noch nicht unterstützt wird.
Canada (Central)ca-central-1.aws
Europe (London)eu-west-2.aws
EU (Irland)eu-west-1
EU (Frankfurt)eu-central-1
Asia Pacific (Tokio)ap-northeast-1.aws
Asia Pacific (Mumbai)ap-south-1.aws
Asia Pacific (Singapur)ap-southeast-1
Asia Pacific (Sydney)ap-southeast-2
Google Cloud Platform (GCP)
US Central1 (Iowa)us-central1.gcp
Europe West2 (London)europe-west2.gcp
Europe West4 (Niederlande)europe-west4.gcp
Microsoft Azure
West US 2 (Washington)west-us-2.azure
East US 2 (Virginia)east-us-2.azure
US Gov Virginiaus-gov-virginia.azure
Nur für Business Critical-Konten (oder höher) verfügbar.
Canada Central (Toronto)canada-central.azure
West Europe (Niederlande)west-europe.azure
Switzerland North (Zürich)switzerland-north.azure
Southeast Asia (Singapur)southeast-asia.azure
Australia East (New South Wales)australia-east.azure
user='<Benutzeranmeldename>'
Gibt den Anmeldenamen des Snowflake-Benutzers an, der den Snowpipe-Code ausführen soll.
pipe='<Datenbankname>.<Schemaname>.<Pipename>'
Gibt den vollständig qualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll, und zwar in der Form
<Datenbankname>.<Schemaname>.<Pipename>
.
Geben Sie in der Liste der Dateiobjekte den Pfad zu Ihren zu importierenden Dateien an:
staged_file_list = []
Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung
.csv.gz
haben.
Speichern Sie die Datei an einem geeigneten Speicherort.
Bei den übrigen Anweisungen unter diesem Thema wird davon ausgegangen, dass der Dateiname SnowpipeLamdbaCode.py
ist.
Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen¶
Führen Sie die folgenden Schritte aus, um eine Python-Laufzeitumgebung für Lambda zu erstellen, und fügen Sie den Snowpipe-Code hinzu, den Sie unter Schritt 1: Python-Code zum Aufrufen der Snowpipe-REST-API (unter diesem Thema) angepasst haben. Weitere Informationen zu diesen Schritten finden Sie in der Dokumentation zum AWS-Lambda-Bereitstellungspaket (siehe Anweisungen zu Python).
Wichtig
Die Skripte in den folgenden Schritten sind ein repräsentatives Beispiel und setzen voraus, dass Sie eine AWS EC2-Linux-Instanz erstellen, die auf einer Amazon Machine Instance (AMI) basiert und den YUM-Paketmanager verwendet, der von RPM abhängt. Wenn Sie ein Debian-basiertes Linux AMI auswählen, müssen Sie Ihre Skripte entsprechend aktualisieren.
Erstellen Sie eine AWS EC2-Linux-Instanz, indem Sie die AWS EC2-Anweisungen ausführen. Diese Instanz stellt die Computeressourcen für die Ausführung des Snowpipe-Codes zur Verfügung.
Kopieren Sie die Snowpipe-Code-Datei mit SCP (Secure Copy) in Ihre neue AWS EC2-Instanz:
scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
Wobei:
<Pfad>
ist der Pfad zu Ihrer lokalenSnowpipeLambdaCode.py
-Datei.<Computer>.<Regions-ID>
ist der DNS-Name der EC2-Instanz (z. B.ec2-54-244-54-199.us-west-2.compute.amazonaws.com
).Der DNS-Name wird auf dem Instances-Bildschirm der Amazon EC2-Konsole angezeigt.
Verbinden Sie sich mit der EC2-Instanz per SSH (Secure SHell):
ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
Installieren Sie Python und verwandte Bibliotheken in der EC2-Instanz:
sudo yum install -y gcc zlib zlib-devel openssl openssl-devel wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz tar -xzvf Python-3.6.1.tgz cd Python-3.6.1 && ./configure && make sudo make install sudo /usr/local/bin/pip3 install virtualenv /usr/local/bin/virtualenv ~/shrink_venv source ~/shrink_venv/bin/activate pip install Pillow pip install boto3 pip install requests pip install snowflake-ingest
Erstellen Sie das ZIP-Bereitstellungspaket (
Snowpipe.zip
):cd $VIRTUAL_ENV/lib/python3.6/site-packages zip -r9 ~/Snowpipe.zip . cd ~ zip -g Snowpipe.zip SnowpipeLambdaCode.py
Schritt 3: AWS IAM-Rolle für Lambda erstellen¶
Folgen Sie der AWS-Lambda-Dokumentation, um eine IAM-Rolle zum Ausführen der Lambda-Funktion zu erstellen.
Notieren Sie den IAM Amazon Resource Name (ARN) für die Rolle. Sie benötigen ihn im nächsten Schritt.
Schritt 4: Lambda-Funktion erstellen¶
Erstellen Sie die Lambda-Funktion, indem Sie das .zip
-Bereitstellungspaket hochladen, das Sie in Schritt 2: Bereitstellungspaket für Lambda-Funktion erstellen (unter diesem Thema) erstellt haben:
aws lambda create-function \ --region us-west-2 \ --function-name IngestFile \ --zip-file fileb://~/Snowpipe.zip \ --role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \ --handler SnowpipeLambdaCode.handler \ --runtime python3.6 \ --profile adminuser \ --timeout 10 \ --memory-size 1024
Geben Sie für --role
den Rollen-ARN an, den Sie sich in Schritt 3: AWS IAM-Rolle für Lambda erstellen (unter diesem Thema) notiert haben.
Notieren Sie sich den in der Ausgabe angegebenen ARN der neuen Funktion. Sie benötigen ihn im nächsten Schritt.
Schritt 5: Aufrufe der Lambda-Funktion zulassen¶
Erteilen Sie S3 die erforderlichen Berechtigungen zum Aufruf Ihrer Funktion.
Geben Sie für --source-arn
den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.
aws lambda add-permission \ --function-name IngestFile \ --region us-west-2 \ --statement-id enable-ingest-calls \ --action "lambda:InvokeFunction" \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<SourceBucket> \ --source-account <aws_account_id> \ --profile adminuser
Schritt 6: Lambda-Benachrichtigungsereignis registrieren¶
Registrieren Sie ein Lambda-Benachrichtigungsereignis, indem Sie die Anweisungen für Amazon S3-Ereignisbenachrichtigungen ausführen. Geben Sie im Eingabefeld den Funktions-ARN an, den Sie in Schritt 4: Lambda-Funktion erstellen (unter diesem Thema) erstellt haben.