オプション2: AWS LambdaによるSnowpipeの自動化

AWS Lambdaは、イベントによってトリガーされたときに実行され、システムにロードされたコードを実行するコンピューティングサービスです。このトピックで提供されているサンプルPythonコードを調整し、Snowpipe REST API を呼び出すLambda関数を作成して、外部ステージ(つまり、S3バケット、Azureコンテナはサポートされていません)。関数は、ホストされている AWS アカウントに展開されます。Lambdaで定義するイベント(例:S3バケット内のファイルが更新されるとき)は、Lambda関数を呼び出し、Pythonコードを実行します。

このトピックでは、Snowpipeを使用して連続的にデータをマイクロバッチに自動的にロードするようにLambda関数を構成するために必要な手順について説明します。

注釈

このトピックは、 Snowpipe REST APIを使用したデータロードの準備 の手順を使用してSnowpipeを構成したことを前提としています。

このトピックの内容:

ステップ1:Snowpipe REST API を呼び出すPythonコードを書く

サンプルPythonコード

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

import os
import sys
import uuid

# Assume the public key has been registered in Snowflake
# private key in PEM format
private_key="""
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
"""

# Proxy object that abstracts the Snowpipe REST API
ingest_manager = SimpleIngestManager(account='<account_name>',
                   host='<account_name>.<region_id>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)

注釈

サンプルコードはエラー処理を考慮していません。たとえば、失敗した ingest_manager の呼び出しを再試行しません。

サンプルコードを使用する前に、次を変更します。

  1. セキュリティパラメーターを更新します。

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    キーペア認証の使用Snowpipe REST APIを使用したデータロードの準備 内)で作成した秘密キーファイルのコンテンツを指定します。

  2. セッションパラメーターを更新します。

    account='<アカウント名>'

    アカウントの名前を指定します(Snowflakeが提供)。

    host='<アカウント名>.<地域ID>.snowflakecomputing.com'

    ホスト情報を URL の形式で指定します。 URL の形式は、アカウントが置かれている 地域 によって異なります。

    US 西部

    <アカウント名>.snowflakecomputing.com

    その他の地域

    <アカウント名>.<地域ID>.snowflakecomputing.com

    <アカウント名> はアカウントの名前(Snowflakeが提供)で、 <地域ID> は次のとおりです。

    地域

    地域 ID

    メモ

    Amazon Web Services(AWS)

    US 西部(オレゴン)

    us-west-2

    US 西部のアカウントに AWS PrivateLink を設定する場合にのみ必要です。

    US 東部(オハイオ)

    us-east-2.aws

    US 東部(バージニア北部)

    us-east-1

    US 東部(商業組織、バージニア政府北部)

    us-east-1-gov.aws

    Business Critical(またはそれ以上)のアカウントでのみ使用できます。Snowflakeがまだサポートしていない独立した専用クラウドである、 AWS GovCloud (US)には位置していません。

    カナダ(中部)

    ca-central-1.aws

    EU (アイルランド)

    eu-west-1

    EU (フランクフルト)

    eu-central-1

    アジア太平洋(東京)

    ap-northeast-1.aws

    アジア太平洋(ムンバイ)

    ap-south-1.aws

    アジア太平洋(シンガポール)

    ap-southeast-1

    アジア太平洋(シドニー)

    ap-southeast-2

    Google Cloud Platform(GCP)

    US 中央部1(アイオワ)

    us-central1.gcp

    ヨーロッパ西部2(ロンドン)

    europe-west2.gcp

    ヨーロッパ西部4(オランダ)

    europe-west4.gcp

    Microsoft Azure

    西 US 2(ワシントン)

    west-us-2.azure

    東 US 2(バージニア)

    east-us-2.azure

    US 政府バージニア

    us-gov-virginia.azure

    Business Critical(またはそれ以上)のアカウントでのみ利用可能です。

    カナダ中央部(トロント)

    canada-central.azure

    西ヨーロッパ(オランダ)

    west-europe.azure

    スイス北部(チューリッヒ)

    switzerland-north.azure

    東南アジア(シンガポール)

    southeast-asia.azure

    オーストラリア東部(ニューサウスウェールズ)

    australia-east.azure

    user='<ユーザーログイン名>'

    Snowpipeコードを実行するSnowflakeユーザーのログイン名を指定します。

    pipe='<データベース名>.<スキーマ名>.<パイプ名>'

    データのロードに使用するパイプの完全修飾名を <データベース名>.<スキーマ名>.<パイプ名> の形式で指定します。

  3. ファイルオブジェクトリストでインポートするファイルへのパスを指定します。

    staged_file_list = []

    指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子 .csv.gz が付いている場合があります。

  4. ファイルを便利な場所に保存します。

このトピックの残りの手順では、ファイル名を SnowpipeLamdbaCode.py と想定しています。

ステップ2:Lambda関数デプロイメントパッケージを作成する

次の手順を実行してLambda用のPythonランタイム環境を構築し、 ステップ1: Snowpipe REST API を呼び出すPythonコードを書く (このトピック内)に合わせたSnowpipeのコードを追加します。これらの手順の詳細については、 AWS Lambdaデプロイメントパッケージドキュメント をご参照ください(Pythonの手順を参照)。

重要

次のステップのスクリプトは代表的な例であり、YUM パッケージマネージャーを使用するAmazon Machine Instance(AMI)に基づいて AWS EC2 Linuxインスタンスを作成していると想定しています。これは RPM に依存しています。DebianベースのLinux AMI を選択した場合は、それに応じてスクリプトを更新してください。

  1. AWS EC2 の手順 を完了して、AWS EC2 Linuxインスタンスを作成します。このインスタンスは、Snowpipeコードを実行するためのコンピューティングリソースを提供します。

  2. SCP (セキュアコピー)を使用して、Snowpipeコードファイルを新しい AWS EC2 インスタンスにコピーします。

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    

    条件:

    • <パス> は、ローカルの SnowpipeLambdaCode.py ファイルへのパスです。

    • <マシン>.<地域ID> は、 EC2 インスタンスの DNS 名です(例: ec2-54-244-54-199.us-west-2.compute.amazonaws.com)。

      DNS 名は、Amazon EC2 コンソールの Instances 画面に表示されます。

  3. SSH を使用してEC2 インスタンスに接続します(セキュア SHell)。

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
  4. Pythonおよび関連ライブラリを EC2 インスタンスにインストールします。

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
  5. .zipデプロイメントパッケージを作成します(Snowpipe.zip)。

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    

ステップ3:Lambdaの AWS IAM ロールを作成する

AWS Lambdaドキュメント に従って、Lambda関数を実行する IAM ロールを作成します。

ロールの IAM Amazonリソース名(ARN) を記録します。次のステップで使用します。

ステップ4:Lambda関数を作成する

ステップ2:Lambda関数デプロイメントパッケージを作成する (このトピック内)で作成した、 .zip デプロイメントパッケージをアップロードしてLambda関数を作成します。

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

--role には、 ステップ3:Lambdaの AWS IAM ロールを作成する (このトピック内)で記録したロール ARN を指定します。

出力から新しい関数の ARN を記録します。次のステップで使用します。

ステップ5:Lambda関数の呼び出しを許可する

S3に関数を呼び出すために必要な権限を付与します。

--source-arn には、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

ステップ6:Lambda通知イベントを登録する

Amazon S3イベント通知 の手順を完了して、Lambda通知イベントを登録します。入力フィールドで、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。