オプション2: AWS LambdaによるSnowpipeの自動化¶
AWS Lambdaは、イベントによってトリガーされたときに実行され、システムにロードされたコードを実行するコンピューティングサービスです。このトピックで提供されているサンプルPythonコードを調整し、Snowpipe REST API を呼び出すLambda関数を作成して、外部ステージ(つまり、S3バケット、Azureコンテナはサポートされていません)。関数は、ホストされている AWS アカウントに展開されます。Lambdaで定義するイベント(例:S3バケット内のファイルが更新されるとき)は、Lambda関数を呼び出し、Pythonコードを実行します。
このトピックでは、Snowpipeを使用して連続的にデータをマイクロバッチに自動的にロードするようにLambda関数を構成するために必要な手順について説明します。
注釈
このトピックは、 Snowpipe REST APIを使用したデータロードの準備 の手順を使用してSnowpipeを構成したことを前提としています。
このトピックの内容:
ステップ1: Snowpipe REST API を呼び出すPythonコードを書く¶
サンプルPythonコード
from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend
import os
with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
注釈
サンプルコードはエラー処理を考慮していません。たとえば、失敗した ingest_manager
の呼び出しを再試行しません。
サンプルコードを使用する前に、次を変更します。
セキュリティパラメーターを更新します。
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """
キーペア認証の使用およびキーローテーション ( Snowpipe REST APIを使用したデータロードの準備 内)で作成した秘密キーファイルのコンテンツを指定します。
PRIVATE_KEY_PASSPHRASE
環境変数を使用して、秘密キーファイルを復号化するためのパスフレーズを指定します。Linuxまたは macOS:
export PRIVATE_KEY_PASSPHRASE='<passphrase>'
Windows:
set PRIVATE_KEY_PASSPHRASE='<passphrase>'
セッションパラメーターを更新します。
account='<アカウント識別子>'
アカウントの一意の識別子を指定します(Snowflakeが提供)。
host
の説明もご参照ください。host='<アカウント識別子>.snowflakecomputing.com'
Snowflakeアカウントの一意のホスト名を指定します。
アカウント識別子の推奨形式は次のとおりです。
organization_name-account_name
Snowflake組織とアカウントの名前。詳細については、 形式1(推奨): 組織内のアカウント名。 をご参照ください。
または、必要に応じて、アカウントがホストされている リージョン および クラウドプラットフォーム とともに、 アカウントロケーター を指定します。詳細については、 形式2(レガシー): リージョン内のアカウントロケーター。 をご参照ください。
user='<ユーザーログイン名>'
Snowpipeコードを実行するSnowflakeユーザーのログイン名を指定します。
pipe='<データベース名>.<スキーマ名>.<パイプ名>'
データのロードに使用するパイプの完全修飾名を
<データベース名>.<スキーマ名>.<パイプ名>
の形式で指定します。
ファイルオブジェクトリストでインポートするファイルへのパスを指定します。
staged_file_list = []
指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子
.csv.gz
が付いている場合があります。
ファイルを便利な場所に保存します。
このトピックの残りの手順では、ファイル名を SnowpipeLamdbaCode.py
と想定しています。
ステップ2: Lambda関数デプロイメントパッケージを作成する¶
次の手順を実行してLambda用のPythonランタイム環境を構築し、 ステップ1: Snowpipe REST API を呼び出すPythonコードを書く (このトピック内)に合わせたSnowpipeのコードを追加します。これらの手順の詳細については、 AWS Lambdaデプロイメントパッケージドキュメント をご参照ください(Pythonの手順を参照)。
重要
次のステップのスクリプトは代表的な例であり、YUM パッケージマネージャーを使用するAmazon Machine Instance(AMI)に基づいて AWS EC2 Linuxインスタンスを作成していると想定しています。これは RPM に依存しています。DebianベースのLinux AMI を選択した場合は、それに応じてスクリプトを更新してください。
AWS EC2 の手順 を完了して、AWS EC2 Linuxインスタンスを作成します。このインスタンスは、Snowpipeコードを実行するためのコンピューティングリソースを提供します。
SCP (セキュアコピー)を使用して、Snowpipeコードファイルを新しい AWS EC2 インスタンスにコピーします。
scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
条件:
<パス>
は、ローカルのSnowpipeLambdaCode.py
ファイルへのパスです。<マシン>.<地域ID>
は、 EC2 インスタンスの DNS 名です(例:ec2-54-244-54-199.us-west-2.compute.amazonaws.com
)。DNS 名は、Amazon EC2 コンソールの Instances 画面に表示されます。
SSH を使用してEC2 インスタンスに接続します(セキュア SHell)。
ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
Pythonおよび関連ライブラリを EC2 インスタンスにインストールします。
sudo yum install -y gcc zlib zlib-devel openssl openssl-devel wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz tar -xzvf Python-3.6.1.tgz cd Python-3.6.1 && ./configure && make sudo make install sudo /usr/local/bin/pip3 install virtualenv /usr/local/bin/virtualenv ~/shrink_venv source ~/shrink_venv/bin/activate pip install Pillow pip install boto3 pip install requests pip install snowflake-ingest
.zipデプロイメントパッケージを作成します(
Snowpipe.zip
)。cd $VIRTUAL_ENV/lib/python3.6/site-packages zip -r9 ~/Snowpipe.zip . cd ~ zip -g Snowpipe.zip SnowpipeLambdaCode.py
ステップ3: Lambdaの AWS IAM ロールを作成する¶
AWS Lambdaドキュメント に従って、Lambda関数を実行する IAM ロールを作成します。
ロールの IAM Amazonリソース名(ARN) を記録します。次のステップで使用します。
ステップ4: Lambda関数を作成する¶
ステップ2:Lambda関数デプロイメントパッケージを作成する (このトピック内)で作成した、 .zip
デプロイメントパッケージをアップロードしてLambda関数を作成します。
aws lambda create-function \ --region us-west-2 \ --function-name IngestFile \ --zip-file fileb://~/Snowpipe.zip \ --role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \ --handler SnowpipeLambdaCode.handler \ --runtime python3.6 \ --profile adminuser \ --timeout 10 \ --memory-size 1024
--role
には、 ステップ3:Lambdaの AWS IAM ロールを作成する (このトピック内)で記録したロール ARN を指定します。
出力から新しい関数の ARN を記録します。次のステップで使用します。
ステップ5: Lambda関数の呼び出しを許可する¶
S3に関数を呼び出すために必要な権限を付与します。
--source-arn
には、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。
aws lambda add-permission \ --function-name IngestFile \ --region us-west-2 \ --statement-id enable-ingest-calls \ --action "lambda:InvokeFunction" \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<SourceBucket> \ --source-account <aws_account_id> \ --profile adminuser
ステップ6: Lambda通知イベントを登録する¶
Amazon S3イベント通知 の手順を完了して、Lambda通知イベントを登録します。入力フィールドで、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。