AWS のサンプル非同期リモートサービス

このトピックには、サンプルの非同期 AWS Lambda関数(リモートサービス)が含まれています。 ステップ1: リモートサービス(AWS のLambda関数)をManagementコンソールで作成する で説明されているのと同じステップに従って、このサンプル関数を作成できます。

このトピックの内容:

コードの概要

ドキュメントのこのセクションでは、 AWS で非同期外部関数を作成する方法について説明します。(最初の非同期外部関数を実装する前に、非同期外部関数の 概念の概要 を読むことをお勧めします。)

AWS では、非同期リモートサービスは次の制限を克服する必要があります。

  • HTTP POST と GET は別々のリクエストであるため、リモートサービスは POST リクエストによって起動されたワークフローに関する情報を保持して、後で GET リクエストによって状態を照会できるようにする必要があります。

    通常、各 HTTP POST および HTTP GET は、個別のプロセスまたはスレッドでハンドラー関数の個別のインスタンスを呼び出します。個別のインスタンスはメモリを共有しません。GET ハンドラーがステータスまたは処理されたデータを読み取るには、 GET ハンドラーが AWS で使用可能な共有ストレージリソースにアクセスする必要があります。

  • POST ハンドラーが最初の HTTP 202応答コードを送信する唯一の方法は、ハンドラーの実行を終了する return ステートメント(または同等のもの)を使用することです。したがって、 HTTP 202を返す前に、 POST ハンドラーは 独立した プロセス(またはスレッド)を起動して、リモートサービスの実際のデータ処理作業を実行する必要があります。この独立したプロセスは通常、 GET ハンドラーに表示されるストレージにアクセスする必要があります。

非同期リモートサービスのこうした制限を克服する1つの方法は、プロセス(またはスレッド)3つと共有ストレージを使用することです。

Illustration of processes for an Asynchronous Remote Service

このモデルでは、プロセスには次の責任があります。

  • HTTP POST ハンドラー:

    • 入力データを読み取ります。Lambda関数では、これはハンドラー関数の event 入力パラメーターの本体から読み取られます。

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • データ処理プロセスを開始し、データとバッチ ID を渡します。データは通常、呼び出し中に渡されますが、外部ストレージへの書き込みにより渡すこともできます。

    • データ処理プロセスと HTTP GET ハンドラープロセスの両方がアクセスできる共有ストレージに、バッチ ID を記録します。

    • 必要に応じて、このバッチの処理がまだ終了していないことを記録します。

    • エラーが検出されなかった場合は HTTP 202を返します。

  • データ処理コード:

    • 入力データを読み取ります。

    • データを処理します。

    • 結果を GET ハンドラーで使用できるようにします(結果データを共有ストレージに書き込むか、結果をクエリするための API を提供)。

    • 通常、このバッチのステータスを更新して(例: IN_PROGRESS から SUCCESS に)、結果を読み取る準備ができていることを示します。

    • 終了します。オプションで、このプロセスはエラーインジケーターを返すことができます。Snowflakeはこれを直接認識しませんが(Snowflakeは POST ハンドラーと GET ハンドラーからの HTTP リターンコードのみを認識)、データ処理プロセスからエラーインジケーターを返すとデバッグ中に役立つ場合があります。

  • GET ハンドラー:

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • ストレージを読み取り、このバッチの現在のステータスを取得します(例: IN_PROGRESS または SUCCESS)。

    • 処理がまだ進行中の場合は、202を返します。

    • 処理が正常に終了した場合は、次のようなります。

      • 結果を読み取ります。

      • ストレージをクリーンアップします。

      • HTTP コード200とともに結果を返します。

    • 保存されたステータスがエラーを示している場合は、次のようになります。

      • ストレージをクリーンアップします。

      • エラーコードを返します。

    複数の HTTP GET リクエストが送信されるほど処理に時間がかかる場合は、 GET ハンドラーがバッチに対して複数回呼び出されている可能性があることに注意してください。

このモデルには多くのバリエーションがあります。例:

  • バッチ ID とステータスは、 POST プロセスの終了時ではなく、データ処理プロセスの開始時に書き込むことができます。

  • データ処理は、個別の関数(例: 別のLambda関数)で実行することも、完全に個別のサービスとして実行することもできます。

  • データ処理コードは、必ずしも共有ストレージに書き込む必要はありません。代わりに、処理されたデータを別の方法で利用できるようにすることができます。たとえば、 API はバッチ ID をパラメーターとして受け入れ、データを返すことができます。

実装コードでは、処理に時間がかかりすぎたり失敗したりする可能性を考慮に入れる必要があります。したがって、ストレージスペースの浪費を避けるために、部分的な結果は、クリーンアップする必要があります。

ストレージメカニズムは、複数のプロセス(またはスレッド)間で共有可能である必要があります。可能なストレージメカニズムは次のとおりです。

  • AWS によって提供される、次のようなストレージメカニズム。

  • AWS の外部にあるが、 AWS からアクセス可能なストレージ。

上記の各3プロセスのコードは、3つのLambda関数(POST ハンドラー用、データ処理関数用、 GET ハンドラー用)として、または、さまざまな方法で呼び出すことができる単一の関数として記述できます。

以下のサンプルPythonコードは、 POST、データ処理、および GET プロセスに対して 個別に 呼び出すことができる単一のLambda関数です。

サンプルコード

このコードは、出力付きのサンプルクエリを示しています。この例では、共有ストレージメカニズム(DynamoDB)やデータ変換(感情分析)ではなく、3つのプロセスとそれらがどのように相互作用するかに焦点を当てています。コードは、サンプルのストレージメカニズムとデータ変換を別のものに簡単に置き換えることができるように構成されています。

簡単にするために、この例では次のようになります。

  • いくつかの重要な値をハードコーディングします(例: AWS リージョン)。

  • いくつかのリソース(例: Dynamoのジョブテーブル)が存在することを前提としています。

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response
Copy

サンプルの呼び出しと出力

非同期外部関数のサンプル呼び出しと、感情分析結果を含むサンプル出力を次に示します。

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}
Copy

サンプルコードに関する注意

  • データ処理関数は、以下を呼び出すことによって開始されます。

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    
    Copy

    InvocationType は上記のように「イベント」である必要があります。これは、2番目のプロセス(またはスレッド)は非同期である必要があり、 Eventinvoke() メソッドを介して使用できる唯一のタイプの非ブロッキング呼び出しであるためです。

  • データ処理関数は HTTP 200コードを返します。ただし、この HTTP 200コードはSnowflakeに直接返されません。Snowflakeは、 GET がステータスをポーリングし、データ処理関数がこのバッチの処理を正常に終了したことを確認するまで、 HTTP 200を認識しません。