Option 2: Automate Snowpipe with AWS Lambda¶
AWS Lambdaは、イベントによってトリガーされたときに実行され、システムにロードされたコードを実行するコンピューティングサービスです。このトピックで提供されているサンプルPythonコードを調整し、Snowpipe REST API を呼び出すLambda関数を作成して、外部ステージ(つまり、S3バケット、Azureコンテナはサポートされていません)。関数は、ホストされている AWS アカウントに展開されます。Lambdaで定義するイベント(例:S3バケット内のファイルが更新されるとき)は、Lambda関数を呼び出し、Pythonコードを実行します。
このトピックでは、Snowpipeを使用して連続的にデータをマイクロバッチに自動的にロードするようにLambda関数を構成するために必要な手順について説明します。
注釈
このトピックは、 Data loading preparation using the Snowpipe REST API の手順を使用してSnowpipeを構成したことを前提としています。
ステップ1: Snowpipe REST API を呼び出すPythonコードを書く¶
サンプルPythonコード
注釈
サンプルコードはエラー処理を考慮していません。たとえば、失敗した ingest_manager の呼び出しを再試行しません。
サンプルコードを使用する前に、次を変更します。
セキュリティパラメーターを更新します。
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """Use key pair authentication & key rotation ( Data loading preparation using the Snowpipe REST API 内)で作成した秘密キーファイルのコンテンツを指定します。
PRIVATE_KEY_PASSPHRASE環境変数を使用して、秘密キーファイルを復号化するためのパスフレーズを指定します。Linuxまたは macOS:
Windows:
セッションパラメーターを更新します。
account='<アカウント識別子>'アカウントの一意の識別子を指定します(Snowflakeが提供)。
hostの説明もご参照ください。host='<アカウント識別子>.snowflakecomputing.com'Snowflakeアカウントの一意のホスト名を指定します。
アカウント識別子の推奨形式は次のとおりです。
organization_name-account_nameSnowflake組織とアカウントの名前。詳細については、 形式1(推奨): 組織内のアカウント名 をご参照ください。
または、必要に応じて、アカウントがホストされている リージョン および クラウドプラットフォーム とともに、 アカウントロケーター を指定します。詳細については、 形式2: リージョン内のアカウントロケーター をご参照ください。
user='<ユーザーログイン名>'Snowpipeコードを実行するSnowflakeユーザーのログイン名を指定します。
pipe='<データベース名>.<スキーマ名>.<パイプ名>'データのロードに使用するパイプの完全修飾名を
<データベース名>.<スキーマ名>.<パイプ名>の形式で指定します。
ファイルオブジェクトリストでインポートするファイルへのパスを指定します。
staged_file_list = []指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子
.csv.gzが付いている場合があります。
ファイルを便利な場所に保存します。
このトピックの残りの手順では、ファイル名を SnowpipeLambdaCode.py と想定しています。
ステップ2: Lambda関数デプロイメントパッケージを作成する¶
次の手順を実行してLambda用のPythonランタイム環境を構築し、 ステップ1: Snowpipe REST API を呼び出すPythonコードを書く (このトピック内)に合わせたSnowpipeのコードを追加します。これらの手順の詳細については、 AWS Lambdaデプロイメントパッケージドキュメント をご参照ください(Pythonの手順を参照)。
重要
次のステップのスクリプトは代表的な例であり、YUM パッケージマネージャーを使用するAmazon Machine Instance(AMI)に基づいて AWS EC2 Linuxインスタンスを作成していると想定しています。これは RPM に依存しています。DebianベースのLinux AMI を選択した場合は、それに応じてスクリプトを更新してください。
AWS EC2 の手順 を完了して、AWS EC2 Linuxインスタンスを作成します。このインスタンスは、Snowpipeコードを実行するためのコンピューティングリソースを提供します。
SCP (セキュアコピー)を使用して、Snowpipeコードファイルを新しい AWS EC2 インスタンスにコピーします。
条件:
<パス>は、ローカルのSnowpipeLambdaCode.pyファイルへのパスです。<マシン>.<地域ID>は、 EC2 インスタンスの DNS 名です(例:ec2-54-244-54-199.us-west-2.compute.amazonaws.com)。DNS 名は、Amazon EC2 コンソールの Instances 画面に表示されます。
SSH を使用してEC2 インスタンスに接続します(セキュア SHell)。
Pythonおよび関連ライブラリを EC2 インスタンスにインストールします。
.zipデプロイメントパッケージを作成します(
Snowpipe.zip)。
ステップ3: Lambdaの AWS IAM ロールを作成する¶
AWS Lambdaドキュメント に従って、Lambda関数を実行する IAM ロールを作成します。
ロールの IAM Amazonリソース名(ARN) を記録します。次のステップで使用します。
ステップ4: Lambda関数を作成する¶
ステップ2:Lambda関数デプロイメントパッケージを作成する (このトピック内)で作成した、 .zip デプロイメントパッケージをアップロードしてLambda関数を作成します。
--role には、 ステップ3:Lambdaの AWS IAM ロールを作成する (このトピック内)で記録したロール ARN を指定します。
出力から新しい関数の ARN を記録します。次のステップで使用します。
ステップ5: Lambda関数の呼び出しを許可する¶
S3に関数を呼び出すために必要な権限を付与します。
--source-arn には、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。
ステップ6: Lambda通知イベントを登録する¶
Amazon S3イベント通知 の手順を完了して、Lambda通知イベントを登録します。入力フィールドで、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。