非同期外部関数の作成

このドキュメントでは、非同期外部関数について説明します。

このトピックの内容:

非同期リモートサービスの概要

リモートサービスは、 同期的 または 非同期的 に操作できます。

同期リモートサービスは HTTP POST リクエストを受信して処理し、結果を返します。データの処理にかかる時間によっては、リクエストが受信されてから結果が返されるまでに大幅な遅延が発生する可能性があります。

非同期リモートサービスは HTTP POST リクエストを受信し、リクエストが受信されたという確認応答を(通常はほぼ即時に)返します。次に、呼び出し元(Snowflake)は、1つ以上の HTTP GET リクエストを発行するポーリングループを実行し(通常、各リクエスト間に大幅な遅延あり)、非同期処理のステータスを確認します。GET はリクエスト本文でデータを送信しませんが、元の POST と同じヘッダーを含みます。

非同期リモートサービスは、リモートサービスが、プロキシサービスなど(例: Amazon API Gateway)のコンポーネントに組み込まれているタイムアウトを超えた場合に役立ちます。

リモートサービスは、必ずしも完全に同期または非同期である必要はありません。リモートサービスは、リクエスト内のデータの量、処理されている他のリクエストの数などの要因に応じて、異なる時間に同期および非同期で動作できます。

次の図は、同期処理と非同期処理を示しています。上のパスは同期しています。下のパス(1つ以上の HTTP GET リクエストを含む)は非同期です。

Illustration of the HTTP POST and GET requests for asynchronous external functions.

非同期動作は、リモートサービスを作成する人(およびSnowflake)によって実装されます。SQL ステートメントは、非同期リモートサービスの場合と同期リモートサービスの場合で同じです。

Snowflakeの外部関数の実装は、通常、同期および非同期の両方のサードパーティ関数ライブラリと互換性があります。

独自のリモートサービスを作成していて、Snowflakeの非同期処理と互換性を持たせたい場合は、次のように動作するようにリモートサービスを作成します。

  • 最初に行の特定の バッチ の HTTP POST を受信すると、リモートサービスは HTTP コード202(「処理中...」)を返します。

  • リモートサービスが POST の後、出力の準備が整う前に HTTP GET リクエストを受信した場合、リモートサービスは HTTP コード202を返します。

  • リモートサービスは、すべての出力行を生成した後に、同じバッチ ID で次の HTTP GET を待ち、受信した行を HTTP コード200(「正常に完了しました...」)と一緒に返します。

つまり、受信したバッチごとに、リモートサービスは結果の準備ができるまで202を返し、その後、次の GET が結果と HTTP 200を受信します。

Snowflakeは、バッチごとに非同期リモートサービスと次のように連携します。

  1. Snowflakeは、処理するデータを含むHTTP POSTを、一意のバッチIDとともに送信します。

  2. SnowflakeがHTTP 202応答を受信すると、Snowflakeは次のいずれかがtrueになるまでループします。

    • SnowflakeがデータとHTTP 200を受信。

    • Snowflakeの内部タイムアウトに達する。

    • Snowflakeがエラーを受信(例: HTTP応答コード5XX)。

    ループの各反復で、Snowflakeは遅延し、対応するHTTP POSTのバッチIDと同じバッチIDを含むHTTP GETを発行して、リモートサービスが正しいバッチの情報を返すことができるようにします。

    ループ内の遅延は最初は短く、Snowflakeのタイムアウトに達するまで、受信した202応答ごとに長くなります。

  3. 200が返される前にSnowflakeのタイムアウトに達すると、Snowflakeは SQL クエリを中止します。

    現在、Snowflakeのタイムアウトは10分(600秒)であり、ユーザーが構成することはできません。このタイムアウトは将来変更される可能性があります。

注釈

クエリがタイムアウトに達する頻度は、リモートサービスのスケーラビリティに一部依存します。リモートサービスが頻繁にタイムアウトする場合は、 スケーラビリティ の説明もご参照ください。

AWS での非同期関数の作成

AWS では、非同期リモートサービスは次の制限を克服する必要があります。

  • HTTP POST と GET は別々のリクエストであるため、リモートサービスは POST リクエストによって起動されたワークフローに関する情報を保持して、後で GET リクエストによって状態を照会できるようにする必要があります。

    通常、各 HTTP POST および HTTP GET は、個別のプロセスまたはスレッドでハンドラー関数の個別のインスタンスを呼び出します。個別のインスタンスはメモリを共有しません。GET ハンドラーがステータスまたは処理されたデータを読み取るには、 GET ハンドラーが AWS で使用可能な共有ストレージリソースにアクセスする必要があります。

  • POST ハンドラーが最初の HTTP 202応答コードを送信する唯一の方法は、ハンドラーの実行を終了する return ステートメント(または同等のもの)を使用することです。したがって、 HTTP 202を返す前に、 POST ハンドラーは 独立した プロセス(またはスレッド)を起動して、リモートサービスの実際のデータ処理作業を実行する必要があります。この独立したプロセスは通常、 GET ハンドラーに表示されるストレージにアクセスする必要があります。

非同期リモートサービスのこうした制限を克服する1つの方法は、プロセス(またはスレッド)3つと共有ストレージを使用することです。

Illustration of processes for an Asynchronous Remote Service

このモデルでは、プロセスには次の責任があります。

  • HTTP POST ハンドラー:

    • 入力データを読み取ります。Lambda関数では、これはハンドラー関数の event 入力パラメーターの本体から読み取られます。

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • データ処理プロセスを開始し、データとバッチ ID を渡します。データは通常、呼び出し中に渡されますが、外部ストレージへの書き込みにより渡すこともできます。

    • データ処理プロセスと HTTP GET ハンドラープロセスの両方がアクセスできる共有ストレージに、バッチ ID を記録します。

    • 必要に応じて、このバッチの処理がまだ終了していないことを記録します。

    • エラーが検出されなかった場合は HTTP 202を返します。

  • データ処理コード:

    • 入力データを読み取ります。

    • データを処理します。

    • 結果を GET ハンドラーで使用できるようにします(結果データを共有ストレージに書き込むか、結果をクエリするための API を提供)。

    • 通常、このバッチのステータスを更新して(例: 「IN_PROGRESS」から「SUCCESS」に)、結果を読み取る準備ができていることを示します。

    • 終了します。オプションで、このプロセスはエラーインジケーターを返すことができます。Snowflakeはこれを直接認識しませんが(Snowflakeは POST ハンドラーと GET ハンドラーからの HTTP リターンコードのみを認識)、データ処理プロセスからエラーインジケーターを返すとデバッグ中に役立つ場合があります。

  • GET ハンドラー:

    • バッチ ID を読み取ります。Lambda関数では、これは event 入力パラメーターのヘッダーから読み取られます。

    • ストレージを読み取り、このバッチの現在のステータスを取得します(例:「IN_PROGRESS」または「SUCCESS」)。

    • 処理がまだ進行中の場合は、202を返します。

    • 処理が正常に終了した場合は、次のようなります。

      • 結果を読み取ります。

      • ストレージをクリーンアップします。

      • HTTP コード200とともに結果を返します。

    • 保存されたステータスがエラーを示している場合は、次のようになります。

      • ストレージをクリーンアップします。

      • エラーコードを返します。

    複数の HTTP GET リクエストが送信されるほど処理に時間がかかる場合は、 GET ハンドラーがバッチに対して複数回呼び出されている可能性があることに注意してください。

このモデルには多くのバリエーションがあります。例:

  • バッチ ID とステータスは、 POST プロセスの終了時ではなく、データ処理プロセスの開始時に書き込むことができます。

  • データ処理は、個別の関数(例: 別のLambda関数)で実行することも、完全に個別のサービスとして実行することもできます。

  • データ処理コードは、必ずしも共有ストレージに書き込む必要はありません。代わりに、処理されたデータを別の方法で利用できるようにすることができます。たとえば、 API はバッチ ID をパラメーターとして受け入れ、データを返すことができます。

実装コードでは、処理に時間がかかりすぎたり失敗したりする可能性を考慮に入れる必要があります。したがって、ストレージスペースの浪費を避けるために、部分的な結果は、クリーンアップする必要があります。

ストレージメカニズムは、複数のプロセス(またはスレッド)間で共有可能である必要があります。可能なストレージメカニズムは次のとおりです。

  • AWS によって提供される、次のようなストレージメカニズム。

  • AWS の外部にあるが、 AWS からアクセス可能なストレージ。

上記の各3プロセスのコードは、3つのLambda関数(POST ハンドラー用、データ処理関数用、 GET ハンドラー用)として、または、さまざまな方法で呼び出すことができる単一の関数として記述できます。

以下のサンプルPythonコードは、 POST、データ処理、および GET プロセスに対して 個別に 呼び出すことができる単一のLambda関数です。

このコードは、出力付きのサンプルクエリを示しています。この例では、共有ストレージメカニズム(DynamoDB)やデータ変換(感情分析)ではなく、3つのプロセスとそれらがどのように相互作用するかに焦点を当てています。コードは、サンプルのストレージメカニズムとデータ変換を別のものに簡単に置き換えることができるように構成されています。

簡単にするために、この例では次のようになります。

  • いくつかの重要な値をハードコーディングします(例: AWS 地域)。

  • いくつかのリソース(例: Dynamoの「Jobs」テーブ)が存在することを前提としています。

import json
import time
import boto3

HTTP_METHOD_STRING = "httpMethod"
HEADERS_STRING = "headers"
BATCH_ID_STRING = "sf-external-function-query-batch-id"
DATA_STRING = "data"
REGION_NAME = "us-east-2"

TABLE_NAME = "Jobs"
IN_PROGRESS_STATUS = "IN_PROGRESS"
SUCCESS_STATUS = "SUCCESS"

def lambda_handler(event, context):
    # this is called from either the GET or POST
    if (HTTP_METHOD_STRING in event):
        method = event[HTTP_METHOD_STRING]
        if method == "POST":
            return initiate(event, context)
        elif method == "GET":
            return poll(event, context)
        else:
            return create_response(400, "Function called from invalid method")

    # if not called from GET or POST, then this lambda was called to
    # process data
    else:
        return process_data(event, context)


# Reads batch_ID and data from the request, marks the batch_ID as being processed, and
# starts the processing service.
def initiate(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    data = json.loads(event["body"])[DATA_STRING]

    lambda_name = context.function_name

    write_to_storage(batch_id, IN_PROGRESS_STATUS, "NULL")
    lambda_response = invoke_process_lambda(batch_id, data, lambda_name)

    # lambda response returns 202, because we are invoking it with
    # InvocationType = 'Event'
    if lambda_response["StatusCode"] != 202:
        response = create_response(400, "Error in inititate: processing lambda not started")
    else:
        response = {
            'statusCode': lambda_response["StatusCode"]
        }

    return response


# Processes the data passed to it from the POST handler. In this example,
# the processing is to perform sentiment analysis on text.
def process_data(event, context):
    data = event[DATA_STRING]
    batch_id = event[BATCH_ID_STRING]

    def process_data_impl(data):
        comprehend = boto3.client(service_name='comprehend', region_name=REGION_NAME)
        # create return rows
        ret = []
        for i in range(len(data)):
            text = data[i][1]
            sentiment_response = comprehend.detect_sentiment(Text=text, LanguageCode='en')
            sentiment_score = json.dumps(sentiment_response['SentimentScore'])
            ret.append([i, sentiment_score])
        return ret

    processed_data = process_data_impl(data)
    write_to_storage(batch_id, SUCCESS_STATUS, processed_data)

    return create_response(200, "No errors in process")


# Repeatedly checks on the status of the batch_ID, and returns the result after the
# processing has been completed.
def poll(event, context):
    batch_id = event[HEADERS_STRING][BATCH_ID_STRING]
    processed_data = read_data_from_storage(batch_id)

    def parse_processed_data(response):
        # in this case, the response is the response from DynamoDB
        response_metadata = response['ResponseMetadata']
        status_code = response_metadata['HTTPStatusCode']

        # Take action depending on item status
        item = response['Item']
        job_status = item['status']
        if job_status == SUCCESS_STATUS:
            # the row number is stored at index 0 as a Decimal object,
            # we need to convert it into a normal int to be serialized to JSON
            data = [[int(row[0]), row[1]] for row in item['data']]
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'data': data
                })
            }
        elif job_status == IN_PROGRESS_STATUS:
            return {
                'statusCode': 202,
                "body": "{}"
            }
        else:
            return create_response(500, "Error in poll: Unknown item status.")

    return parse_processed_data(processed_data)


def create_response(code, msg):
    return {
        'statusCode': code,
        'body': msg
    }


def invoke_process_lambda(batch_id, data, lambda_name):
    # Create payload to be sent to processing lambda
    invoke_payload = json.dumps({
        BATCH_ID_STRING: batch_id,
        DATA_STRING: data
    })

    # Invoke processing lambda asynchronously by using InvocationType='Event'.
    # This allows the processing to continue while the POST handler returns HTTP 202.
    lambda_client = boto3.client('lambda', region_name=REGION_NAME,)
    lambda_response = lambda_client.invoke(
        FunctionName=lambda_name,
        InvocationType='Event',
        Payload=invoke_payload
    )
    # returns 202 on success if InvocationType = 'Event'
    return lambda_response


def write_to_storage(batch_id, status, data):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    # Put in progress item in table
    item_to_store = {
        'batch_id': batch_id,
        'status': status,
        'data': data,
        'timestamp': "{}".format(time.time())
    }
    db_response = table.put_item(
        Item=item_to_store
    )


def read_data_from_storage(batch_id):
    # we assume that the table has already been created
    client = boto3.resource('dynamodb')
    table = client.Table(TABLE_NAME)

    response = table.get_item(Key={'batch_id': batch_id},
                          ConsistentRead=True)
    return response

非同期外部関数の呼び出し例と、感情分析結果を含むサンプル出力を次に示します。

create table test_tb(a string);
insert into test_tb values
    ('hello world'),
    ('I am happy');
select ext_func_async(a) from test_tb;

Row | EXT_FUNC_ASYNC(A)
0   | {"Positive": 0.47589144110679626, "Negative": 0.07314028590917587, "Neutral": 0.4493273198604584, "Mixed": 0.0016409909585490823}
1   | {"Positive": 0.9954453706741333, "Negative": 0.00039307220140472054, "Neutral": 0.002452891319990158, "Mixed": 0.0017087293090298772}

サンプルコードに関する注意:

  • データ処理関数は、以下を呼び出すことによって開始されます。

    lambda_response = lambda_client.invoke(
        ...
        InvocationType='Event',
        ...
    )
    

    InvocationType は上記のように「イベント」である必要があります。これは、2番目のプロセス(またはスレッド)は非同期である必要があり、 Eventinvoke() メソッドを介して使用できる唯一のタイプの非ブロッキング呼び出しであるためです。

  • データ処理関数は HTTP 200コードを返します。ただし、この HTTP 200コードはSnowflakeに直接返されません。Snowflakeは、 GET がステータスをポーリングし、データ処理関数がこのバッチの処理を正常に終了したことを確認するまで、 HTTP 200を認識しません。