オプション2: AWS LambdaによるSnowpipeの自動化

AWS Lambdaは、イベントによってトリガーされたときに実行され、システムにロードされたコードを実行するコンピューティングサービスです。このトピックで提供されているサンプルPythonコードを調整し、Snowpipe REST API を呼び出すLambda関数を作成して、外部ステージ(つまり、S3バケット、Azureコンテナはサポートされていません)。関数は、ホストされている AWS アカウントに展開されます。Lambdaで定義するイベント(例:S3バケット内のファイルが更新されるとき)は、Lambda関数を呼び出し、Pythonコードを実行します。

このトピックでは、Snowpipeを使用して連続的にデータをマイクロバッチに自動的にロードするようにLambda関数を構成するために必要な手順について説明します。

注釈

このトピックは、 Snowpipe REST APIを使用したデータロードの準備 の手順を使用してSnowpipeを構成したことを前提としています。

このトピックの内容:

ステップ1:Snowpipe REST API を呼び出すPythonコードを書く

サンプルPythonコード

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

注釈

サンプルコードはエラー処理を考慮していません。たとえば、失敗した ingest_manager の呼び出しを再試行しません。

サンプルコードを使用する前に、次を変更します。

  1. セキュリティパラメーターを更新します。

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    キーペア認証の使用およびキーローテーションSnowpipe REST APIを使用したデータロードの準備 内)で作成した秘密キーファイルのコンテンツを指定します。

    PRIVATE_KEY_PASSPHRASE 環境変数を使用して、秘密キーファイルを復号化するためのパスフレーズを指定します。

    • Linuxまたは macOS:

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows:

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. セッションパラメーターを更新します。

    account='<アカウント識別子>'

    アカウントの一意の識別子を指定します(Snowflakeが提供)。 host の説明もご参照ください。

    host='<アカウント識別子>.snowflakecomputing.com'

    Snowflakeアカウントの一意のホスト名を指定します。

    アカウント識別子の推奨形式は次のとおりです。

    organization_name-account_name

    Snowflake組織とアカウントの名前。詳細については、 形式1(推奨): 組織内のアカウント名。 をご参照ください。

    または、必要に応じて、アカウントがホストされている リージョン および クラウドプラットフォーム とともに、 アカウントロケーター を指定します。詳細については、 形式2(レガシー): リージョン内のアカウントロケーター。 をご参照ください。

    user='<ユーザーログイン名>'

    Snowpipeコードを実行するSnowflakeユーザーのログイン名を指定します。

    pipe='<データベース名>.<スキーマ名>.<パイプ名>'

    データのロードに使用するパイプの完全修飾名を <データベース名>.<スキーマ名>.<パイプ名> の形式で指定します。

  3. ファイルオブジェクトリストでインポートするファイルへのパスを指定します。

    staged_file_list = []

    指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子 .csv.gz が付いている場合があります。

  4. ファイルを便利な場所に保存します。

このトピックの残りの手順では、ファイル名を SnowpipeLamdbaCode.py と想定しています。

ステップ2:Lambda関数デプロイメントパッケージを作成する

次の手順を実行してLambda用のPythonランタイム環境を構築し、 ステップ1: Snowpipe REST API を呼び出すPythonコードを書く (このトピック内)に合わせたSnowpipeのコードを追加します。これらの手順の詳細については、 AWS Lambdaデプロイメントパッケージドキュメント をご参照ください(Pythonの手順を参照)。

重要

次のステップのスクリプトは代表的な例であり、YUM パッケージマネージャーを使用するAmazon Machine Instance(AMI)に基づいて AWS EC2 Linuxインスタンスを作成していると想定しています。これは RPM に依存しています。DebianベースのLinux AMI を選択した場合は、それに応じてスクリプトを更新してください。

  1. AWS EC2 の手順 を完了して、AWS EC2 Linuxインスタンスを作成します。このインスタンスは、Snowpipeコードを実行するためのコンピューティングリソースを提供します。

  2. SCP (セキュアコピー)を使用して、Snowpipeコードファイルを新しい AWS EC2 インスタンスにコピーします。

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    条件:

    • <パス> は、ローカルの SnowpipeLambdaCode.py ファイルへのパスです。

    • <マシン>.<地域ID> は、 EC2 インスタンスの DNS 名です(例: ec2-54-244-54-199.us-west-2.compute.amazonaws.com)。

      DNS 名は、Amazon EC2 コンソールの Instances 画面に表示されます。

  3. SSH を使用してEC2 インスタンスに接続します(セキュア SHell)。

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. Pythonおよび関連ライブラリを EC2 インスタンスにインストールします。

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. .zipデプロイメントパッケージを作成します(Snowpipe.zip)。

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

ステップ3:Lambdaの AWS IAM ロールを作成する

AWS Lambdaドキュメント に従って、Lambda関数を実行する IAM ロールを作成します。

ロールの IAM Amazonリソース名(ARN) を記録します。次のステップで使用します。

ステップ4:Lambda関数を作成する

ステップ2:Lambda関数デプロイメントパッケージを作成する (このトピック内)で作成した、 .zip デプロイメントパッケージをアップロードしてLambda関数を作成します。

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

--role には、 ステップ3:Lambdaの AWS IAM ロールを作成する (このトピック内)で記録したロール ARN を指定します。

出力から新しい関数の ARN を記録します。次のステップで使用します。

ステップ5:Lambda関数の呼び出しを許可する

S3に関数を呼び出すために必要な権限を付与します。

--source-arn には、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

ステップ6:Lambda通知イベントを登録する

Amazon S3イベント通知 の手順を完了して、Lambda通知イベントを登録します。入力フィールドで、 ステップ4:Lambda関数を作成する (このトピック内)で記録した関数 ARN を指定します。