ウェブインターフェイスを使用して AWS で外部関数を作成する

このドキュメントでは、Amazon Web Services(AWS)クラウドプラットフォームで次を含む外部関数を作成する方法の1つを示しています。

  • リモートサービスの作成(AWSのLambda関数)。

  • プロキシサービスの作成(Amazon API Gateway上)。

  • Snowflakeでの API 統合の作成。

  • Snowflakeでの外部関数の作成。

  • 外部関数の呼び出し。

これらの手順では、 AWS の管理にすでに精通していることを前提としています。これらの手順では、実行する必要のある一般的なステップを指定しますが、 AWS Managementコンソールの詳細については、詳細が変更される可能性があるため説明しません。

このトピックの内容:

AWS での外部関数の計画

AWS で外部関数を作成するための前提条件

必須:

  • 次の権限を含む、 AWS のアカウント。

    • IAMを介したAWSロールの作成(IDおよびアクセス管理)。

    • AWS Lambda関数の作成。

    • API Gatewayエンドポイントの作成。

  • ACCOUNTADMIN 権限または CREATE INTEGRATION 権限があるロールを持っているSnowflakeアカウント。

このドキュメントは、経験豊富な AWS 管理者を前提としています。

ワークシート

外部関数を作成するときは、入力した特定の情報(例:API Gateway URL)を記録して、後続のステップでその情報を使用できるようにする必要があります。以下のワークシートは、この情報を追跡するのに役立ちます。

==============================================================================================
=========================================== Worksheet ========================================
==============================================================================================

------------------ Information about the Lambda Function (remote service) --------------------

AWS Account ID.............: _____________________________________________

Lambda Function Name.......: _____________________________________________


---------------------- Information about the API Gateway (proxy Service) ---------------------

New IAM Role Name..........: _____________________________________________

New IAM Role ARN...........: _____________________________________________

Snowflake VPC ID (optional): _____________________________________________

New API Name...............: _____________________________________________

API Gateway Resource Name..: _____________________________________________

Resource Invocation URL....: _____________________________________________

Method Request ARN.........: _____________________________________________


------------ Information about the API Integration and External Function ---------------------

API Integration Name.......: _____________________________________________

API_AWS_IAM_USER_ARN.......: _____________________________________________

API_AWS_EXTERNAL_ID........: _____________________________________________

External Function Name.....: _____________________________________________

AWS で外部関数を構築するための追加リソース(オプションの読み込み)

独自の外部関数用に独自のリモートサービスを作成する準備ができたら、次の場所で利用できるLambda関数に基づくリモートサービスの例を確認することをお勧めします。

ステップ1: リモートサービスを作成する(AWS のLambda関数)

リモートサービスを作成するには、複数の方法があります。このセクションでは、 AWS Lambdaで実行されるPython関数として実装されるリモートサービスを作成する方法を示します。

この外部関数は 同期性 です。 非同期 の外部関数の作成については、 AWS での非同期関数の作成 をご参照ください。

このサンプルのPython言語関数は、単にその入力を返します。入力は SQL VARIANT 値として返されます。これには、1以上の値が含まれる場合があります。

この関数は、Snowflakeが送信し、読み取る形式と同じ形式(JSON)でデータを受け入れ、返します。(データ形式の詳細については、 リモートサービスの入力および出力データ形式 をご参照ください。)

このPython関数は、 eventcontext の2つのパラメーターを受け取ります。 event パラメーターには多くのサブフィールドが含まれ、そのうちの1つは body です。本文は data という名前のキーを含む辞書です。 data の対応する値は、Snowflakeが JSON 形式で送信したデータを保持する文字列です。AWS Lambdaは、Snowflakeから送信された HTTP POST リクエストを便利に処理し、本文を抽出してイベントパラメーターの本文を渡すため、このサンプル関数は HTTP POST リクエスト全体を解析する必要がありません。

このPython関数が本文を文字列として抽出した後、関数は JSON ライブラリ関数を呼び出して文字列をPythonデータ構造に変換します。次に、関数はそのデータ構造から個々の行を抽出し、それらを処理して、それぞれの値を返します。

AWS Lambda関数からの一般的な戻り値の JSON は次のようになります。

{
"statusCode": <http_status_code>,
"body":
        {
            "data":
                  [
                      [ 0, <value> ],
                      [ 1, <value> ]
                      ...
                  ]
        }
}

戻り値のデータは、入力データについて前述した形式と一致します。AWS では、 HTTP と互換性のあるサービスの規則は、 JSON オブジェクト内に本文を返すことです。これには、 HTTP ステータスコードも含まれます。

この AWS Lambda関数を作成するには、次のステップに従います。

  1. AWS マネジメントコンソールにログインします(まだログインしていない場合)。

  2. ワークシートの「AWS アカウント ID」フィールドに AWS アカウント ID を記録します。

    AWS アカウント ID を検索する必要がある場合は、 AWS の手順 に従ってください。

  3. Lambda を選択します。

  4. Create function を選択します。

  5. 関数名を入力します。

    この名前をワークシートの「Lambda関数名」フィールドに記録します。

  6. 使用する言語を選択します。この例では、 Python 3.7 を選択します。

  7. この機能の実行ロールを選択または作成します。

    適切なオプション(通常は Create a new role with basic Lambda permissions)を選択します。

    (このロールは、クラウドアカウントのロールおよびSnowflakeのロールとは別。)

  8. Create Function ボタンをクリックします。

  9. lambda_function タブで、関数のコードを入力します。独自の関数をまだ作成していない場合は、デフォルトの関数コードを入力がエコーされる以下のコードで置換できます。カスタム関数を作成する準備ができたら、後でこのコードを置換または更新できます。

    このサンプルコードは、 API Gatewayエンドポイントを作成 するための手順内でSnowflakeが推奨するように、Lambdaプロキシ統合を使用していることを前提としています。

    編集ウィンドウに貼り付けることができない場合は、関数のファイル名をダブルクリックして編集を有効にしてみます。

    import json
    
    def lambda_handler(event, context):
    
        # 200 is the HTTP status code for "ok".
        status_code = 200
    
        # The return value will contain an array of arrays (one inner array per input row).
        array_of_rows_to_return = [ ]
    
        try:
            # From the input parameter named "event", get the body, which contains
            # the input rows.
            event_body = event["body"]
    
            # Convert the input from a JSON string into a JSON object.
            payload = json.loads(event_body)
            # This is basically an array of arrays. The inner array contains the
            # row number, and a value for each parameter passed to the function.
            rows = payload["data"]
    
            # For each input row in the JSON object...
            for row in rows:
                # Read the input row number (the output row number will be the same).
                row_number = row[0]
    
                # Read the first input parameter's value. For example, this can be a
                # numeric value or a string, or it can be a compound value such as
                # a JSON structure.
                input_value_1 = row[1]
    
                # Read the second input parameter's value.
                input_value_2 = row[2]
    
                # Compose the output based on the input. This simple example
                # merely echoes the input by collecting the values into an array that
                # will be treated as a single VARIANT value.
                output_value = ["Echoing inputs:", input_value_1, input_value_2]
    
                # Put the returned row number and the returned value into an array.
                row_to_return = [row_number, output_value]
    
                # ... and add that array to the main array.
                array_of_rows_to_return.append(row_to_return)
    
            json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return})
    
        except Exception as err:
            # 400 implies some type of error.
            status_code = 400
            # Tell caller what this function could not handle.
            json_compatible_string_to_return = event_body
    
        # Return the return value and HTTP status code.
        return {
            'statusCode': status_code,
            'body': json_compatible_string_to_return
        }
    
  10. Deploy ボタンをクリックして、関数を展開します。

  11. オプションですが、強くお勧めします。関数をテストします。

    Snowflakeが提供するサンプルPython関数の場合、次のテストデータを使用します(デフォルトのデータを以下のデータで置換)。

    {
      "body":
        "{ \"data\": [ [ 0, 43, \"page\" ], [ 1, 42, \"life, the universe, and everything\" ] ] }"
    }
    

    実行結果は次のようになります。

    {
      "statusCode": 200,
      "body": "{\"data\": [[0, [\"Echoing inputs:\", 43, \"page\"]], [1, [\"Echoing inputs:\", 42, \"life, the universe, and everything\"]]]}"
    }
    

上記が正しく機能していれば、外部関数のリモートサービスとして使用できる AWS Lambda関数ができました。

ステップ2:プロキシサービス(Amazon API Gateway)を構成し、 API 統合を(Snowflakeで)作成する

Amazon API Gatewayをプロキシサービスとして設定するには、次を含むいくつかの手順が必要です。

  • AWS アカウントに新しい IAM ロールを作成します。

  • Amazon API Gatewayエンドポイントを作成し、構成します。

  • Amazon API Gatewayエンドポイントを保護します。

  • Snowflakeで API 統合オブジェクトを作成します。

  • Snowflakeと新しい IAM ロールの間に信頼関係を設定します。

API 統合には API Gateway(ロールの ARN (Amazonリソースネーム))からの情報が必要であり、 API Gatewayには API 統合からの情報(API_AWS_EXTERNAL_ID および API_AWS_IAM_USER_ARN)が必要であるため、これらを作成するステップはインターリーブされます。

AWS アカウントでの新しい IAM ロールの作成

Snowflakeが AWS アカウントを認証するには、Snowflakeが所有する IAM (IDおよびアクセス管理)ユーザーに、 AWS アカウントで IAM ロールを引き受ける権限を付与する必要があります。これを実行するには、信頼関係を確立する必要があります。信頼関係を確立するには、 AWS アカウントに IAM ロールを作成し、Snowflakeが所有する IAM ユーザーの ARN で構成する必要があります。また、Snowflakeに API 統合オブジェクトを作成し、 API 統合オブジェクトに、どの IAM ロールを引き受けるかに関する情報を設定する必要があります。

  1. 新しい IAM ロールを作成します。

  2. 信頼できるエンティティのタイプを選択するように求められたら、 Another AWS account を選択します。

  3. Specify accounts that can use this role を求められたら、ワークシートの「AWS アカウント ID」フィールドからの値を貼り付けます。

  4. Next: Permissions をクリックします。

  5. 必要に応じて、権限(Attach permissions policies)を設定します。

  6. ロール名を入力します。

    • ワークシートの「新しい IAM ロール名」にロール名を記録します。

  7. Create role ボタンをクリックします。ロールを作成した後、

    • ワークシートの「新しい IAM ロール ARN」フィールドに Role ARN を記録します。

Amazon API Gatewayでの API の作成および構成

エンドポイントの選択: 地域エンドポイント対プライベートエンドポイント

URI を介してプロキシサービス(Amazon API Gatewayなど)にアクセスします。これは エンドポイント と呼ばれることがあります。Amazon API Gatewayの場合、次のいずれかを作成できます。

  • 地域エンドポイント。

  • プライベートエンドポイント。

地域エンドポイントは、地域間またはクラウド間でもアクセスできます。Snowflakeインスタンス、プロキシサービス、およびリモートサービスのすべてが、異なる地域、あるいは異なるクラウドプラットフォームでも利用できます。たとえば、Azureで実行されているSnowflakeインスタンスは、Amazon API Gatewayの地域エンドポイントにリクエストを送信できます。これにより、GCPで実行されているリモートサービスにデータが転送されます。

プライベートエンドポイントは、 PrivateLink 経由で同じ AWS リージョン内のSnowflake VPC (仮想プライベートクラウド)からのみアクセスを許可するように構成できます。

AWS にあるエンドポイントの種類の詳細については、次をご参照ください。

プライベートエンドポイントを使用する必要があり、使用している地域がわからない場合は、次のいずれかを実行して地域を検索できます。

  • SQL 関数 CURRENT_REGION() を呼び出します(例: SELECT CURRENT_REGION())。

  • Snowflakeアカウントのホスト名を確認します。これは通常、クラウドプロバイダーと地域を表します。アカウントのホスト名、地域、およびクラウドプロバイダーの詳細については、 サポートされているクラウドリージョン をご参照ください。

プライベートエンドポイントを使用するには、アカウントが次の要件を満たしている必要があります。

  • SnowflakeのBusiness Critical(またはそれ以上の)エディション。

AWSでは、SnowflakeはSnowflakeアカウントのAWS VPCsへの接続をサポートしています。プライベートエンドポイントは、PrivateLinkを介したVPCs(仮想プライベートクラウド)間の通信を可能にします。SnowflakeとAWS PrivateLinkの詳細については、以下をご参照ください。

API Gatewayエンドポイントの作成

API Gatewayを作成および構成する前に、地域エンドポイントとプライベートエンドポイントのどちらを使用するかを選択します。詳細については、 エンドポイントの選択: 地域エンドポイント対プライベートエンドポイント をご参照ください。

注釈

AWS 以外でホストされているSnowflakeのインスタンスで、外部関数を作成できます。仮想ウェアハウスがAzureまたは GCP (Google Cloud Platform)上にある場合は、Amazon API Gatewayを介して、リモートサービスにアクセスする外部関数を作成できます。

API Gatewayエンドポイントを作成するステップは次のとおりです。

  1. プライベートエンドポイントを使用する場合は、Snowflakeウェブインターフェイスで次のコマンドを実行してVPC(仮想プライベートクラウド)IDを取得します。

    select system$get_snowflake_platform_info();
    

    出力は次のようになります。

    {"snowflake-vpc-id":["vpc-12345699"]}
    

    ワークシートの「Snowflake VPC ID」フィールドに VPC ID (例: 「vpc-12345699」)を記録します。

    (注: VPC エンドポイント ID ではなく、 VPC ID を使用する必要があります。VPC エンドポイント IDs は時間の経過とともに変化する可能性があります。)

  2. AWS 管理コンソールで、 API Gateway を選択します。

  3. Create API を選択します。

  4. エンドポイントのタイプ(地域またはプライベート)を選択します。

    • 地域 エンドポイントを選択する場合は、次のようにします。

      • REST API を探し、 Build ボタンをクリックします。

    • プライベート エンドポイントを選択する場合は、次のようにします。

      • REST API private を探し、 Build ボタンをクリックします。

    重要

    REST API または REST API private を選択してください。HTTP API または他のオプションを選択しないでください。

  5. New API オプションを選択します。

  6. 新しい API の名前を入力します。

    この名前をワークシートの「新しい API 名」フィールドに記録します。

  7. Endpoint Type を選択するように求められた場合は、 Regional または Private のいずれかを選択します。

  8. プライベートエンドポイントを作成する場合は、ワークシートの「Snowflake VPC ID」フィールドに記録されているSnowflake VPC ID を入力します。

    (プライベートエンドポイントを作成していない場合は、 VPC ID を入力する必要はありません。)

  9. Create API ボタンをクリックします。

    これにより、リソースを作成できる新しい画面が表示されます。

  10. リソースを作成します。

    Create Resource オプションを表示するには、 Actions ボタンをクリックする必要がある場合があります。)

    ワークシートの「API Gatewayリソース名」フィールドにリソース名を記録します。

    Create Resource ボタンをクリックすると、画面には No methods defined for the resource. と表示されます。

  11. 新しいメソッドを作成します。Actions をクリックし、このリソースに対して Create Method を選択します。

    POST オプションを指定します。(POST オプションが表示されない場合は、リソース名の下にある小さなドロップダウンメニューボックスをクリックしてください。)

    これにより、 Integration type およびその他のオプションを示す新しいウィンドウペインが表示されます。

  12. Integration typeLambda Function である必要があります。それがまだ選択されていない場合は、それを選択します。

  13. Use Lambda Proxy integration チェックボックスをクリックします。

    Lambdaプロキシ統合を選択することは重要です。Lambdaプロキシ統合なしの JSON は、Lambdaプロキシ統合がある JSON とは異なります。Lambdaプロキシ統合の詳細については、以下の AWS ドキュメントをご参照ください。

  14. Lambda Function フィールドに、ワークシートに記録したLambda関数名を貼り付けます。

  15. 保存します。

  16. Actions ボタンをクリックして、 Deploy API アクションを選択します。

  17. この関数のステージを選択または作成します。

  18. リソース名の下に、 POST が表示されます。

    これが表示されない場合は、リソース名の左側にある三角形をクリックして、リソースツリーを展開する必要が生じる場合があります。

  19. POST をクリックし、ワークシートの「リソース呼び出し URL」フィールドに POST リクエストの Invoke URL を記録します。

    呼び出し URL に、作成したリソースの名前が含まれていることを確認します。含まれていない場合は、リソースではなくステージの呼び出し URL をクリックした可能性があります。

  20. Save Changes をクリックします。

Amazon API Gatewayエンドポイントの保護

Amazon API Gatewayエンドポイントといった、プロキシサービスエンドポイントの保護の概要については、 プロキシサービスの保護 をご参照ください。

Amazon API Gatewayエンドポイントのセキュリティを保護するには、

  1. この時点で、 API Gateway情報を表示する画面が表示され、リソースと POST メソッドが表示されます。

    まだそこにいない場合は、次のようにします。

    1. AWS Managementコンソールで、 API Gatewayページに移動します。

    2. まだ選択していない場合は、 API Gatewayを選択します。

    3. 左側のペインで、 Resources をクリックします。

    4. POST メソッドをクリックします。(これが表示されない場合は、 Resources ペイン(通常は左から2番目のペイン)でリソースの左側にある三角形をクリックして、リソースツリーを展開します。)

  2. Method Request ARNMethod Request ボックスからワークシートの「メソッドリクエスト ARN」フィールドにコピーします。

  3. タイトル Method Request をクリックします。

  4. Method RequestAWS_IAM 認証が必要であることを指定します。

    ドロップダウンメニューから AWS_IAM 認証を選択した後、メニューの横にある小さなチェックマークをクリックして、選択を確認する必要があることに注意してください。

  5. API Gatewayのリソースポリシーを設定して、ゲートウェイエンドポイントを呼び出すことができるユーザーを指定します。

    リソースポリシーを入力できる編集ウィンドウにアクセスするには、ウィンドウの左側の列にある API の Resource Policy をクリックする必要がある場合があります。

    • リージョンのエンドポイント:

      以下のJSON 形式リソースポリシーテンプレートをリソースポリシーエディターに貼り付けてから、以下で説明するように、プレースホルダーをワークシートの適切な値に置き換えます。

      {
          "Version": "2012-10-17",
          "Statement":
          [
              {
              "Effect": "Allow",
              "Principal":
                  {
                  "AWS": "arn:aws:sts::<12-digit-number>:assumed-role/<external_function_role>/snowflake"
                  },
              "Action": "execute-api:Invoke",
              "Resource": "<method_request_ARN>"
              }
          ]
      }
      

      リソースポリシーの次の部分を置き換えます。

      • <12桁の数字> をワークシートに記録した AWS アカウント ID に置き換えます。

      • <外部関数ロール> をワークシートの「新しい IAM ロール名」フィールドからのロール名に置き換えます。

        たとえば、 AWS のロール名が次の場合、

        arn:aws:iam::987654321098:role/MyNewIAMRole
        

        結果は次のようになります。

        "AWS": "arn:aws:sts::987654321098:assumed-role/MyNewIAMRole/snowflake"
        
      • <メソッドリクエスト ARN> をワークシートの「メソッドリクエスト ARN」フィールドの値に置き換えます。これは、リソースの POST メソッドの ARN です。

        注釈

        リソースをメソッドリクエスト ARN に設定すると、 API Gatewayは指定されたリソースへの呼び出しのみを許可するように指定します。メソッドリクエスト ARN のサブセットをプレフィックスとして指定できます。これにより、同じ API Gatewayから複数のリソースを呼び出すことができます。

        たとえば、メソッドリクエスト ARN が次の場合、

        arn:aws:execute-api:us-west-1:123456789012:a1b2c3d4e5/*/POST/MyResource
        

        次のプレフィックスのみを指定できます。

        arn:aws:execute-api:us-west-1:123456789012:a1b2c3d4e5/*/
        
    • プライベートエンドポイント:

      以下のリソースポリシーテンプレートをリソースポリシーエディターに貼り付けてから、以下で説明するように、プレースホルダーをワークシートの適切な値に置き換えます。

      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:sts::<12-digit-number>:assumed-role/<external_function_role>/snowflake"
                  },
                  "Action": "execute-api:Invoke",
                  "Resource": "<method_request_ARN>",
                  "Condition": {
                      "StringEquals": {
                          "aws:sourceVpc": "<VPC_ID>"
                      }
                  }
              }
          ]
      }
      

      リソースポリシーの次の部分を置き換えます。

      • リージョンのエンドポイントについて、上記のように <12-digit-number><external_function_role><method_request_ARN> を置き換えます。

      • <VPC_ID> をご使用のリージョンのSnowflake VPC ID に置き換えます。これは、ワークシートの「Snowflake VPC ID」フィールドに記録する必要があります。

  6. リソースポリシーを保存するために Save をまだクリックしていない場合は、ここでクリックしてください。

  7. 更新された API を展開します。

次のいくつかのステップでは、Snowflake API 統合オブジェクトを作成します。ここで AWS 管理ウィンドウを閉じないでください。後で戻る必要があります。

SnowflakeでのAPI統合オブジェクトの作成

  1. Snowflakeセッションを開きます(まだ開いていない場合)。通常はSnowflakeウェブインターフェイスセッションです。

  2. ACCOUNTADMIN権限またはCREATE INTEGRATION権限を持つSnowflakeロールを使用します。次に例を示します。

    use role has_accountadmin_privileges;
    
  3. CREATE API INTEGRATION コマンドを入力して、 API 統合を作成します。コマンドは次のようになります。

    CREATE OR REPLACE API INTEGRATION my_api_integration_01
      api_provider = aws_api_gateway
      api_aws_role_arn = '<new_IAM_role_ARN>'
      api_allowed_prefixes = ('https://')
      enabled = true;
    

    コマンドをカスタマイズします。

    • プライベートエンドポイントを使用している場合は、 api_provider 句を aws_private_api_gateway に設定する必要があります。それ以外の場合は、 api_provider 句を aws_api_gateway に設定する必要があります。

    • <新しい IAM ロール ARN> は、ワークシートの「新しい IAM ロール ARN」フィールドの値である必要があります。

    • api_allowed_prefixes フィールドには、以前に記録したリソース呼び出し URL が含まれている必要があります。

    • この例の値を使用するのではなく、 API 統合の名前をカスタマイズすることもできます。

    以下は、包括的な CREATE API INTEGRATION ステートメントの例です。

    create or replace api integration demonstration_external_api_integration_01
        api_provider=aws_api_gateway
        api_aws_role_arn='arn:aws:iam::123456789012:role/my_cloud_account_role'
        api_allowed_prefixes=('https://xyz.execute-api.us-west-2.amazonaws.com/production/')
        enabled=true;
    
  4. 「API 統合名」というフィールドタイトルのワークシートスロットに、作成した API 統合の名前を記録します。後で CREATE EXTERNAL FUNCTION コマンドを実行するときに、 API 統合名が必要になります。

  5. 上で入力した CREATE API INTEGRATION コマンドを実行します。

  6. DESCRIBE INTEGRATION コマンドを実行します。

    DESCRIBE INTEGRATION <my_integration_name>;
    

    例:

    DESCRIBE INTEGRATION my_api_integration_01;
    
  7. API_AWS_IAM_USER_ARN という名前のプロパティを探して、そのプロパティの property_value をワークシートに記録します。

  8. API_AWS_EXTERNAL_ID という名前のプロパティを見つけて、そのプロパティの property_value をワークシートに記録します。

    API_AWS_EXTERNAL_IDproperty_value は、しばしば等号(「=」)で終わることに注意してください。その等号は値の一部です。プロパティ値の残りの部分と一緒に切り取って貼り付けてください。

次のいくつかのステップでは、 AWS の管理ウィンドウに戻ります。ここでSnowflake管理ウィンドウを閉じないでください。後で戻る必要があります。

Snowflakeと新しい IAM ロールの間における信頼関係の設定

AWS Managementコンソールで、

  1. IAM を選択します。

  2. Roles を選択します。

  3. ワークシートで、「新しい IAM ロール名」フィールドにある値、次に AWS Managementコンソールで同じ値(ロール名)をそれぞれ検索します。

  4. Trust relationships タブ、 Edit trust relationship ボタンの順にクリックします。

    これにより、認証情報を追加できる Policy Document が開きます。

  5. Policy Document で、 Statement.Principal.AWS フィールドを見つけ、値(キーではない)をワークシートの「API_AWS_IAM_USER_ARN」フィールドにある値に置き換えます。

  6. Statement.Condition フィールドを見つけます。最初は、中括弧(「{}」)のみが含まれているはずです。

  7. 中括弧の間に次を貼り付けます。 "StringEquals": { "sts:ExternalId": "xxx" }

  8. xxx をワークシートの「API_AWS_EXTERNAL_ID」フィールドの値に置き換えます。

  9. 信頼関係の Policy Document の編集が完了すると、次のようになります。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "AWS": "arn:aws:iam::1234567898012:user/development/development_user"
          },
          "Action": "sts:AssumeRole",
          "Condition": {"StringEquals": { "sts:ExternalId": "EXTERNAL_FUNCTIONS_SFCRole=3_8Hcmbi9halFOkt+MdilPi7rdgOv=" }}
        }
      ]
    }
    
  10. Update Trust Policy をクリックします。

ステップ3: 外部関数を作成する

次に、Snowflake ウェブインターフェイス(以前に CREATE API INTEGRATION コマンドを入力した場所)に戻ります。

  1. CREATE EXTERNAL FUNCTION コマンドを入力します。次のようになります。

    CREATE EXTERNAL FUNCTION my_external_function(n INTEGER, v VARCHAR)
        RETURNS VARIANT
        API_INTEGRATION = <api_integration_name>
        AS '<resource_invocation_url>';
    

    コマンドをカスタマイズします。

    • <api_integration_name> 値には、以前に作成した API 統合の名前が含まれている必要があります。

    • <resource_invocation_url> 値は、ワークシートに記録したリソース呼び出し URL である必要があります。この URL には、ステージ名だけでなく、 API Gatewayリソース名が含まれていることを確認してください。

    • 関数名をカスタマイズすることもできます。

    この例では2つの引数( INTEGER と VARCHAR)を渡します。これらの引数は、Lambda関数が期待する引数だからです。独自のLambda関数を作成するときは、Lambda関数に適切な引数を渡す必要があります。

  2. ワークシート内の「外部関数名」というタイトルのフィールドに、作成した外部関数の名前を記録します。

  3. 上記で入力した CREATE EXTERNAL FUNCTION コマンドをまだ実行していない場合は、ここで実行してください。

ステップ4:外部関数を呼び出す

  1. 必要に応じて、外部関数の USAGE 権限を1つ以上のSnowflakeロールに付与し、それらのロールが外部関数を呼び出せるようにします。(ロールには、その外部関数に対する USAGE または OWNERSHIP 権限が必要。)

  2. 次を呼び出して関数を実行します。

    SELECT my_external_function(42, 'Adams');
    

    CREATE EXTERNAL FUNCTION コマンドで関数名をカスタマイズした場合は、「使用する外部関数」をカスタマイズした名前に置き換えます。

    戻り値は、次のようになっている必要があります。

    [0, 42, "Adams"]
    

    ここで、 42, "Adams" は戻り値、 0 は戻り値の行番号です。

AWS (オプションで読み取り)での非同期関数の作成

チュートリアルを単純にするために、チュートリアルでは 同期 サンプルの外部関数を作成します。より洗練された外部関数は、 非同期 であることからメリットを受けることができます。

ドキュメントのこのセクションでは、 AWS で非同期外部関数を作成する方法について説明します。(最初の非同期外部関数を実装する前に、非同期外部関数の 概念の概要 を読むことをお勧めします。)

AWS では、非同期リモートサービスは次の制限を克服する必要があります。

  • HTTP POST と GET は別々のリクエストであるため、リモートサービスは POST リクエストによって起動されたワークフローに関する情報を保持して、後で GET リクエストによって状態を照会できるようにする必要があります。

    通常、各 HTTP POST および HTTP GET は、個別のプロセスまたはスレッドでハンドラー関数の個別のインスタンスを呼び出します。個別のインスタンスはメモリを共有しません。GET ハンドラーがステータスまたは処理されたデータを読み取るには、 GET ハンドラーが AWS で使用可能な共有ストレージリソースにアクセスする必要があります。

  • POST ハンドラーが最初の HTTP 202応答コードを送信する唯一の方法は、ハンドラーの実行を終了する return ステートメント(または同等のもの)を使用することです。したがって、 HTTP 202を返す前に、 POST ハンドラーは 独立した プロセス(またはスレッド)を起動して、リモートサービスの実際のデータ処理作業を実行する必要があります。この独立したプロセスは通常、 GET ハンドラーに表示されるストレージにアクセスする必要があります。

非同期リモートサービスのこうした制限を克服する1つの方法は、プロセス(またはスレッド)3つと共有ストレージを使用することです。

Illustration of processes for an Asynchronous Remote Service

このモデルでは、プロセスには次の責任があります。

  • HTTP POST ハンドラー:

    • 入力データを読み取ります。Lambda関数では、これはハンドラー関数の event 入力パラメーターの本体から読み取られます。

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • データ処理プロセスを開始し、データとバッチ ID を渡します。データは通常、呼び出し中に渡されますが、外部ストレージへの書き込みにより渡すこともできます。

    • データ処理プロセスと HTTP GET ハンドラープロセスの両方がアクセスできる共有ストレージに、バッチ ID を記録します。

    • 必要に応じて、このバッチの処理がまだ終了していないことを記録します。

    • エラーが検出されなかった場合は HTTP 202を返します。

  • データ処理コード:

    • 入力データを読み取ります。

    • データを処理します。

    • 結果を GET ハンドラーで使用できるようにします(結果データを共有ストレージに書き込むか、結果をクエリするための API を提供)。

    • 通常、このバッチのステータスを更新して(例: IN_PROGRESS から SUCCESS に)、結果を読み取る準備ができていることを示します。

    • 終了します。オプションで、このプロセスはエラーインジケーターを返すことができます。Snowflakeはこれを直接認識しませんが(Snowflakeは POST ハンドラーと GET ハンドラーからの HTTP リターンコードのみを認識)、データ処理プロセスからエラーインジケーターを返すとデバッグ中に役立つ場合があります。

  • GET ハンドラー:

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • ストレージを読み取り、このバッチの現在のステータスを取得します(例: IN_PROGRESS または SUCCESS)。

    • 処理がまだ進行中の場合は、202を返します。

    • 処理が正常に終了した場合は、次のようなります。

      • 結果を読み取ります。

      • ストレージをクリーンアップします。

      • HTTP コード200とともに結果を返します。

    • 保存されたステータスがエラーを示している場合は、次のようになります。

      • ストレージをクリーンアップします。

      • エラーコードを返します。

    複数の HTTP GET リクエストが送信されるほど処理に時間がかかる場合は、 GET ハンドラーがバッチに対して複数回呼び出されている可能性があることに注意してください。

このモデルには多くのバリエーションがあります。例:

  • バッチ ID とステータスは、 POST プロセスの終了時ではなく、データ処理プロセスの開始時に書き込むことができます。

  • データ処理は、個別の関数(例: 別のLambda関数)で実行することも、完全に個別のサービスとして実行することもできます。

  • データ処理コードは、必ずしも共有ストレージに書き込む必要はありません。代わりに、処理されたデータを別の方法で利用できるようにすることができます。たとえば、 API はバッチ ID をパラメーターとして受け入れ、データを返すことができます。

実装コードでは、処理に時間がかかりすぎたり失敗したりする可能性を考慮に入れる必要があります。したがって、ストレージスペースの浪費を避けるために、部分的な結果は、クリーンアップする必要があります。

ストレージメカニズムは、複数のプロセス(またはスレッド)間で共有可能である必要があります。可能なストレージメカニズムは次のとおりです。

  • AWS によって提供される、次のようなストレージメカニズム。

  • AWS の外部にあるが、 AWS からアクセス可能なストレージ。

上記の各3プロセスのコードは、3つのLambda関数(POST ハンドラー用、データ処理関数用、 GET ハンドラー用)として、または、さまざまな方法で呼び出すことができる単一の関数として記述できます。

以下のサンプルPythonコードは、 POST、データ処理、および GET プロセスに対して 個別に 呼び出すことができる単一のLambda関数です。

このコードは、出力付きのサンプルクエリを示しています。この例では、共有ストレージメカニズム(DynamoDB)やデータ変換(感情分析)ではなく、3つのプロセスとそれらがどのように相互作用するかに焦点を当てています。コードは、サンプルのストレージメカニズムとデータ変換を別のものに簡単に置き換えることができるように構成されています。

簡単にするために、この例では次のようになります。

  • いくつかの重要な値をハードコーディングします(例: AWS リージョン)。

  • いくつかのリソース(例: Dynamoのジョブテーブル)が存在することを前提としています。

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response

非同期外部関数の呼び出し例と、感情分析結果を含むサンプル出力を次に示します。

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}

サンプルコードに関する注意:

  • データ処理関数は、以下を呼び出すことによって開始されます。

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    

    InvocationType は上記のように「イベント」である必要があります。これは、2番目のプロセス(またはスレッド)は非同期である必要があり、 Eventinvoke() メソッドを介して使用できる唯一のタイプの非ブロッキング呼び出しであるためです。

  • データ処理関数は HTTP 200コードを返します。ただし、この HTTP 200コードはSnowflakeに直接返されません。Snowflakeは、 GET がステータスをポーリングし、データ処理関数がこのバッチの処理を正常に終了したことを確認するまで、 HTTP 200を認識しません。

AWS での外部関数のトラブルシューティング

プラットフォームに依存しない症状

データ型の実際の戻り値が期待される戻り値と一致しない

外部関数との間で引数を渡すときは、データ型が適切であることを確認してください。送信された値が受信中のデータ型に適合しない場合、値は切り捨てられるか、他の方法で破損する可能性があります。

詳細については、 外部関数の引数がリモートサービスにより解析される引数に対応していることを確認 をご参照ください。

SQL から関数を呼び出すと、行番号が故障しているというメッセージを表示

考えられる原因

各バッチ内で返される行番号は、0から始まる単調な昇順の整数でなければならないことに注意してください。入力行番号もその規則に従う必要があり、各出力行は対応する入力行と一致する必要があります(例: 出力行0の出力は、入力行0の入力に対応する必要あり)。

考えられる解決策
  1. 返される行番号が受け取った行番号と同じであること、および各出力値が対応する入力の行番号を使用していることを確認してください。これで うまくいくはずです。解決できない場合は、入力行番号が正しくなかったか、行を正しい順序で返していない可能性があるため、以下のステップ2に進みます。

  2. 出力行番号が0から始まり、1ずつ増加し、順番に並んでいることを確認します。

データの入力形式および出力形式の詳細については、 リモートサービスの入力および出力データ形式 をご参照ください。

外部関数を呼び出そうとすると、「エラー解析 JSON: 無効な応答」というメッセージを表示

考えられる原因

最も可能性の高い原因は、リモートサービスから返された JSON (例: AWS Lambda関数)が正しく構築されていないことです。

考えられる解決策

配列中の配列1つを返すとともに、受け取った入力行ごとに1つの内部配列が返すことを確認します。 Snowflakeが受信するデータ形式 で出力形式の説明を確認します。

戻り値の形式が JSON ではないことを示すエラーメッセージ

考えられる原因

考えられる原因の1つは、戻り値の中に二重引用符が含まれていることです。

考えられる解決策

JSON 文字列は二重引用符で区切られますが、ほとんどの場合、文字列自体は引用符で開始および終了できません。埋め込まれた二重引用符が正しくない場合は、削除してください。

関数が間違った行数を受け取ったことを示すエラーメッセージ

考えられる原因

リモートサービスが受信した行よりも多いか少ない行を返そうとした可能性があります。(関数は名目上スカラーではあるものの、「イベント」パラメーターの「本体」フィールドに複数の行を受け取る可能性があり、受け取ったのとまったく同じ数の行を返す必要あり。)

考えられる解決策

リモートサービスが、受信する行ごとに1行を返すことを確認します。

プラットフォーム固有の症状

エンドポイントがLambdaプロキシ統合を使用しているときに、 API Gatewayがエラー502を返す

考えられる原因

Lambda関数が、次の状態である可能性があります。

  • タイムアウト。

  • 例外をスロー。

  • 他の方法で失敗。

考えられる解決策

Lambdaまたは API Gatewayログが利用できる場合は、それらを調べます。

Lambda関数のソースコードが利用できる場合は、Lambda関数のコードを分析してデバッグします。場合によっては、そのコードのコピーをより簡単なコンテキスト(AWS の外)で実行してデバッグを支援できる可能性があります。

Lambda関数に送信されるデータが、Lambda関数が予期する形式であることを確認します。小さくて単純なデータセットを送信して、それが成功するかどうかを確認することをお勧めします。

一度に大量のデータを送信していないことを確認します。

特にLambda関数が多くの CPU リソースを必要とする時や、Lambda関数自体が他のリモートサービスを呼び出してより多くの時間を必要とする時には、タイムアウトを増やすと問題が解決する場合があります。

Amazon AWS Lambda関数内の HTTP POST メソッドにあるリクエスト本文を読み取ることができません。

考えられる原因

Lambdaプロキシ統合を有効にしていない可能性があります。

考えられる解決策

Lambdaプロキシ統合を有効にします。

詳細については、 Amazon API Gatewayでの API の作成および構成 をご参照ください。

エラー: Error assuming AWS_ROLE

メッセージの全文は次のとおりです。

SQL execution error: Error assuming AWS_ROLE.Please verify the role and externalId are configured correctly in your AWS policy.

考えられる原因
  • ロールの AWS 信頼関係ポリシーで、 AWS ARN が正しくありません。考えられる原因には、次が含まれます。

    • 設定しなかった。

    • 設定したが、Snowflakeの DESCRIBE INTEGRATION コマンドから確認できるユーザー ARN ではなく、 AWS ロールの ARN を使用(誤り)した。「API_AWS_ROLE_ARN」フィールドの値ではなく、ワークシートの「API_AWS_IAM_USER_ARN」フィールドにある値を使用していることを確認してください。

  • AWS 信頼関係ポリシーで std:ExternalId が誤っている。考えられる原因には、次が含まれます。

    • 設定しなかった。

    • API 統合オブジェクトを再作成した。API オブジェクトを再作成すると、その外部 ID が変更されます。

エラー: 403 '{"Message":"User: <ARN> is not authorized to perform: execute-api:Invoke"}'

メッセージの全文は次のとおりです。

Request failed for external function <関数名>。エラー: 403 '{"メッセージ":"ユーザー: <ARN> is not authorized to perform: execute-api:Invoke on resource: <MethodRequestARN>"}'

考えられる原因
  • API Gatewayリソースポリシーには次のものがあります。

    • 間違った IAM ロール ARN。

    • 間違って引き受けたロール。

    • 間違ったメソッドリクエスト ARN。

  • IAM ロールには適切なポリシーが添付されていません。

考えられる解決策
  • Amazon API Gatewayエンドポイントの保護 のリソースポリシーテンプレートに従っていることを確認してください。具体的には、リソースポリシーについて次を確認してください。

    • <12-digit number> をワークシートの「AWS アカウント ID」フィールドにある値に置き換えたこと。

    • <external_function_role> をワークシートの「新しい IAM ロール名」フィールドにある値に置き換えたこと。

    • Resource フィールドの method_request_ARN をメソッドリクエスト ARN に置き換えたこと。

  • IAM ロールに正しいアクセス許可ポリシーが添付されていることを確認する必要がある場合は、以下のステップに従って、ロールのアクセス許可ポリシーリストを見つけることができます。

    1. AWS で、ロールを選択します。

    2. ロールの Summary を表示します。

    3. Permissions タブをクリックします。

    4. 必要なポリシーが Permissions policies リストにあることを確認します。

エラー: 403 '{"Message":"User: anonymous is not authorized to perform: execute-api:Invoke"}'

メッセージの全文は次のとおりです。

Request failed for external function <関数名>。エラー: 403 '{"メッセージ":"ユーザー: この操作を行う権限がありません: execute-api:リソースの呼び出し: <MethodRequestARN>"}'

考えられる原因

考えられる原因の1つには、 API Gatewayの承認を構成しているときに、 Method Request にはリソースの AWS_IAM 認証が必要であることを指定しなかった可能性があります。

考えられる解決策

Amazon API Gatewayを保護する の手順に従わなかった場合は、今すぐそれに従って、 AWS_IAM 認証を指定してください。

エラー: Error parsing JSON response ... Error: top-level JSON object must contain "data" JSON array element

メッセージの全文は次のとおりです。

Error parsing JSON response for external function ... Error: top-level JSON object must contain "data" JSON array element

考えられる原因
  • API Gatewayリソースにある POST コマンドのLambdaプロキシ統合を指定していない可能性があります。

考えられる解決策

次のリモートサービスエラーで外部関数 EXT_FUNC のリクエストに失敗: 403 '{"message":"Forbidden"}';

考えられる原因

プロキシサービスには、通常、認証または課金のために API キー が必要でした。API キーがないか、正しくありません。

考えられる解決策

ALTER API INTEGRATION コマンドを使用して、正しい API キーを指定します。