AWS のサンプル非同期リモートサービス¶
このトピックには、サンプルの非同期 AWS Lambda関数(リモートサービス)が含まれています。 ステップ1: リモートサービス(AWS のLambda関数)をManagementコンソールで作成する で説明されているのと同じステップに従って、このサンプル関数を作成できます。
コードの概要¶
ドキュメントのこのセクションでは、 AWS で非同期外部関数を作成する方法について説明します。(最初の非同期外部関数を実装する前に、非同期外部関数の 概念の概要 を読むことをお勧めします。)
AWS では、非同期リモートサービスは次の制限を克服する必要があります。
HTTP POST と GET は別々のリクエストであるため、リモートサービスは POST リクエストによって起動されたワークフローに関する情報を保持して、後で GET リクエストによって状態を照会できるようにする必要があります。
通常、各 HTTP POST および HTTP GET は、個別のプロセスまたはスレッドでハンドラー関数の個別のインスタンスを呼び出します。個別のインスタンスはメモリを共有しません。GET ハンドラーがステータスまたは処理されたデータを読み取るには、 GET ハンドラーが AWS で使用可能な共有ストレージリソースにアクセスする必要があります。
POST ハンドラーが最初の HTTP 202応答コードを送信する唯一の方法は、ハンドラーの実行を終了する
returnステートメント(または同等のもの)を使用することです。したがって、 HTTP 202を返す前に、 POST ハンドラーは 独立した プロセス(またはスレッド)を起動して、リモートサービスの実際のデータ処理作業を実行する必要があります。この独立したプロセスは通常、 GET ハンドラーに表示されるストレージにアクセスする必要があります。
非同期リモートサービスのこうした制限を克服する1つの方法は、プロセス(またはスレッド)3つと共有ストレージを使用することです。
このモデルでは、プロセスには次の責任があります。
HTTP POST ハンドラー:
入力データを読み取ります。Lambda関数では、これはハンドラー関数の
event入力パラメーターの本体から読み取られます。バッチ ID を読み取ります。Lambda関数では、これは
event入力パラメーターのヘッダーから読み取られます。データ処理プロセスを開始し、データとバッチ ID を渡します。データは通常、呼び出し中に渡されますが、外部ストレージへの書き込みにより渡すこともできます。
データ処理プロセスと HTTP GET ハンドラープロセスの両方がアクセスできる共有ストレージに、バッチ ID を記録します。
必要に応じて、このバッチの処理がまだ終了していないことを記録します。
エラーが検出されなかった場合は HTTP 202を返します。
データ処理コード:
入力データを読み取ります。
データを処理します。
結果を GET ハンドラーで使用できるようにします(結果データを共有ストレージに書き込むか、結果をクエリするための API を提供)。
通常、このバッチのステータスを更新して(例:
IN_PROGRESSからSUCCESSに)、結果を読み取る準備ができていることを示します。終了します。オプションで、このプロセスはエラーインジケーターを返すことができます。Snowflakeはこれを直接認識しませんが(Snowflakeは POST ハンドラーと GET ハンドラーからの HTTP リターンコードのみを認識)、データ処理プロセスからエラーインジケーターを返すとデバッグ中に役立つ場合があります。
GET ハンドラー:
バッチ ID を読み取ります。Lambda関数では、これは
event入力パラメーターのヘッダーから読み取られます。ストレージを読み取り、このバッチの現在のステータスを取得します(例:
IN_PROGRESSまたはSUCCESS)。処理がまだ進行中の場合は、202を返します。
処理が正常に終了した場合は、次のようなります。
結果を読み取ります。
ストレージをクリーンアップします。
HTTP コード200とともに結果を返します。
保存されたステータスがエラーを示している場合は、次のようになります。
ストレージをクリーンアップします。
エラーコードを返します。
複数の HTTP GET リクエストが送信されるほど処理に時間がかかる場合は、 GET ハンドラーがバッチに対して複数回呼び出されている可能性があることに注意してください。
このモデルには多くのバリエーションがあります。例:
バッチ ID とステータスは、 POST プロセスの終了時ではなく、データ処理プロセスの開始時に書き込むことができます。
データ処理は、個別の関数(例: 別のLambda関数)で実行することも、完全に個別のサービスとして実行することもできます。
データ処理コードは、必ずしも共有ストレージに書き込む必要はありません。代わりに、処理されたデータを別の方法で利用できるようにすることができます。たとえば、 API はバッチ ID をパラメーターとして受け入れ、データを返すことができます。
実装コードでは、処理に時間がかかりすぎたり失敗したりする可能性を考慮に入れる必要があります。したがって、ストレージスペースの浪費を避けるために、部分的な結果は、クリーンアップする必要があります。
ストレージメカニズムは、複数のプロセス(またはスレッド)間で共有可能である必要があります。可能なストレージメカニズムは次のとおりです。
AWS によって提供される、次のようなストレージメカニズム。
ディスク容量(例: Amazon Elastic File System(EFS))。
AWS を介して利用可能なローカルデータベースサーバー(例: Amazon DynamoDB)。
AWS の外部にあるが、 AWS からアクセス可能なストレージ。
上記の各3プロセスのコードは、3つのLambda関数(POST ハンドラー用、データ処理関数用、 GET ハンドラー用)として、または、さまざまな方法で呼び出すことができる単一の関数として記述できます。
以下のサンプルPythonコードは、 POST、データ処理、および GET プロセスに対して 個別に 呼び出すことができる単一のLambda関数です。
サンプルコード¶
このコードは、出力付きのサンプルクエリを示しています。この例では、共有ストレージメカニズム(DynamoDB)やデータ変換(感情分析)ではなく、3つのプロセスとそれらがどのように相互作用するかに焦点を当てています。コードは、サンプルのストレージメカニズムとデータ変換を別のものに簡単に置き換えることができるように構成されています。
簡単にするために、この例では次のようになります。
いくつかの重要な値をハードコーディングします(例: AWS リージョン)。
いくつかのリソース(例: Dynamoのジョブテーブル)が存在することを前提としています。
サンプルの呼び出しと出力¶
非同期外部関数のサンプル呼び出しと、感情分析結果を含むサンプル出力を次に示します。
サンプルコードに関する注意¶
データ処理関数は、以下を呼び出すことによって開始されます。
InvocationType は上記のように「イベント」である必要があります。これは、2番目のプロセス(またはスレッド)は非同期である必要があり、
Eventはinvoke()メソッドを介して使用できる唯一のタイプの非ブロッキング呼び出しであるためです。データ処理関数は HTTP 200コードを返します。ただし、この HTTP 200コードはSnowflakeに直接返されません。Snowflakeは、 GET がステータスをポーリングし、データ処理関数がこのバッチの処理を正常に終了したことを確認するまで、 HTTP 200を認識しません。