タスクリアクター¶
すべてのSnowflakeコネクタで使用される共通の要素と機能を提供するライブラリ。
要件¶
タスクリアクターでは、Native Appのインストール中に少なくとも次のSQLファイルを実行する必要があります。
task_reactor.sql(タスクリアクター SQL リファレンス を参照)
概要¶
タスクリアクターは、限られたタスクセットを持つキュー内に保存されたワークチャンクのオーケストレーションメカニズムを提供する別のモジュールです。タスクリアクターのキューとディスパッチャーは、 Snowflakeタスク を使用した Snowflakeストリーム に基づいており、リフレッシュ時間の制限により1分ごとにトリガーされます。ウェアハウスがクレジットを節約できるように、タスクリアクターは入力キューにデータがある場合にのみアクティブになります。
タスクリアクターは、キュー、ディスパッチャー、ワーカーの3つの主要コンポーネントで構成されます。
コネクタアプリケーションは、キューに QueueItems を追加します。
ディスパッチャー(Snowflakeタスク)は、1分ごとに待機中の QueueItems をキューから取得し、ワーカーに渡します。
ワーカー(Snowflakeタスク)は、1分ごとに割り当てられた QueueItems で並行して作業します。
コネクタ構成が確定すると、タスクリアクターの構成は次の3ステップに制限されます。
タスクリアクターのすべてのコンポーネントを作成する
インスタンスの初期化
(オプション)ワーカー数の変更
タスクリアクターのすべてのコンポーネントを作成する¶
インスタンスオブジェクトを作成するには、まず worker、 selector、そして必要に応じて expired selector の実装を作成し、 次に TASK_REACTOR.CREATE_INSTANCE_OBJECTS プロシージャを使用してそれらを統合する必要があります。
ワーカーの実装¶
ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。唯一必須なのは、ジョブを開始する特定のワーカーメソッドを持つことです。このメソッドは、Snowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。
session- Snowparkセッションオブジェクトworker_id- 数値、ユニークワーカーIDtask_reactor_schema- タスクリアクターオブジェクトが作成されるスキーマ名。タスクリアクターのインスタンス名として使用できます。
ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。(com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker および com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion)Javaクラスを使用するか、より単純なタスク(com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker および com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask)を使用することをお勧めしますが、ストアドプロシージャのハンドラーの作成がサポートされている任意のプログラミング言語でワーカーを作成できます。
Javaワーカーメソッドの例:
すでに作成されたワーカーメソッドを使用する場合、ユーザーはそれを CONNECTOR.WORKER_PROCEDURE に統合する必要があります。プロシージャは独自のワーカーメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 STRING を返し、次のパラメーターを含める必要があります。
worker_id- 数値task_reactor_schema- 文字列
ワーカーのJava実装を呼び出すプロシージャの例:
イベントテーブルにログ記録されるメトリクスを収集するには、テレメトリライブラリが必要です。
セレクターの実装¶
セレクターの役割は、キューに入れられたタスクのうち、どのタスクをタスクリアクターで処理するかを決定することです。ワーカーの実装と同様に、Snowparkでサポートされている任意の言語で作成できます。タスクセレクターは、データベースプロシージャまたはデータベースビューとして実装できます。セレクター(プロシージャまたはビュー)は、 TASK_REACTOR.CREATE_NEW_INSTANCE プロシージャの引数として渡す必要があります。
プロシージャはSnowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。
session- SnowparkセッションqueueItems- String[](個々の JSON 文字列の配列。各文字列は単一の QueueItem を表します)
Javaセレクターメソッドの例:
セレクターメソッドの代わりに、既存のキューからタスクをフィルタリングおよびソートするビューを作成することもできます。ディスパッチャーは、次のサンプルクエリを使用して、新しく作成されたビューから新しいタスクを取得できます。
ユーザーは、すでに作成されているセレクターメソッドを使用して CONNECTOR.WORK_SELECTOR に統合する必要があります。このプロシージャーは、必須のワークセレクターメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 ARRAY を返し、次のパラメーターを含める必要があります。
work_items - array
ワークセレクターのJava実装を呼び出すプロシージャの例:
期限切れセレクターの実装¶
期限切れセレクターの役割は、タスクリアクターのキューから削除するキューアイテムを決定することです。セレクターが一部のアイテムに到達できず、これらのアイテムがキュー内に永久に残るため、アイテムを削除する必要がある場合があります。さらに、キューで待機している一部のアイテムは、かなり前に作成された可能性があり、それ以上処理が必要ないものもあります。期限切れセレクターは、データベースビューとして実装できます。セレクタービューは、 TASK_REACTOR.CREATE_NEW_INSTANCE プロシージャの引数として渡す必要があります。キューから項目を削除する必要がない場合は、デフォルトの実装 TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR を使用することができます。
次のクエリを使用すると、3日以上前に作成されたアイテムを選択する期限切れセレクタービューを作成できます。
インスタンスオブジェクトの統合¶
TASK_REACTOR.CREATE_INSTANCE_OBJECTS を使用すると、作成されたインスタンスを初期化する前に、すべてのインスタンスをまとめて構成できます。このプロシージャはスキーマごとに1回だけ実行できるため、今後の呼び出しでは変更は行われません。プロシージャが何度も実行されたり、まったく呼び出されなかったりするのを防ぐために、 setup.sql ファイルに初期化コールを置くことをお勧めします。
必須パラメーター:
instance_schema_name VARCHAR- インスタンスが動作するデータベースオブジェクトを格納する、インスタンスごとに一意のスキーマ。worker_procedure_name VARCHAR-Worker Implementationの部分に記述されているワーカープロシージャの名前。work_selector_type VARCHAR- - 新しいタスクで、ビューまたはプロシージャのどちらを使用するかを指定する値。可能な値: VIEW、 PROCEDURE。work_selector_name VARCHAR-Selector Implementation- 部分に記述されているセレクタープロシージャまたはビューの名前。
オプションのパラメーター:
expired_work_selector_name VARCHAR-Expired Selector Implementation- 部分に記述されている期限切れのセレクタービューの名前。値が提供されていない場合、TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTORは何も返さないデフォルトの実装として使用されます。
インスタンスの初期化¶
タスクリアクター内のすべての構成を初期化して実行するには、ユーザーは INITIALIZE_INSTANCE を呼び出す必要があります。このプロシージャは、次のパラメーターを入力として受け取ります。
instance_schema_name- (必須)インスタンスが動作するデータベースオブジェクトを格納するスキーマの名前。warehouse_name(必須)インスタンスが実行されるウェアハウス名。dt_should_be_started(オプション) - デフォルト:TRUE。新しいインスタンスを作成するときに、ディスパッチャータスクを開始する必要があります。dt_task_schedule(オプション) - デフォルト:1 MINUTE。ディスパッチャータスクの実行頻度。dt_allow_overlapping_execution(オプション) - デフォルト:FALSE。DAG を同時に実行できるようにします。dt_user_task_timeout_ms(オプション) - タスクがタイムアウトするまでの1回の実行の制限時間(ミリ秒単位)を指定します。
注釈
もしワーカープロシージャがワーカータスクに設定されたタイムアウト時間 (USER_TASK_TIMEOUT_MS) 以上かかった場合、プロシージャはタイムアウトエラーで中断します。Snowflakeタスクのタイムアウトを超えないようにタスクをスケジュールすることが重要です。
必要なパラメーターの最小数を指定すると、 Task Reactor は指定された構成で初期化され、 TASK_REACTOR.DISPATCHER プロシージャを使用してワーカーをディスパッチします。
ワーカー数を設定する¶
ワーカーの数は、次のパラメーターを使用して、 TASK_REACTOR.SET_WORKERS_NUMBER プロシージャを呼び出すことによって手動で変更できます。
WORKERS_NUMBER- 新しいワーカー数。TR_INSTANCE_SCHEMA_NAME- インスタンススキーマ名
メトリック¶
タスクリアクターにはメトリクスメカニズムがあります。これは Snowflake Trace Events に基づいています。メトリクスはイベントテーブルにログ記録されるため、メトリクスを動作させるにはイベントテーブルを有効にする必要があります。
現在、次のメトリクスが導入されています。
worker working time(TASK_REACTOR_WORKER_WORKING_TIME) - ワーカーが実際にリソースを処理していた時間を示します。タイマーはワーカータスクの開始時に開始され、ワーカータスクの終了時に終了します。worker idle time(TASK_REACTOR_WORKER_IDLE_TIME) -worker working timeの反対です。ワーカーがスリープ状態だった時間(新しい作業を待機中か、タスクの次のスケジュールを待機中)を示します。タイマーは、ワーカーがタスクを完了すると開始され、ワーカーのタスクが再開されると終了します。worker item waiting time(TASK_REACTOR_WORK_ITEM_WAITING_IN_QUEUE_TIME) - ディスパッチャキューで待機しているワークアイテムの時間を表示します。タイマーは、ワークアイテムがディスパッチャーキューに挿入されたときに開始し、ワークアイテムがディスパッチャーキューから削除され、ワーカーキューに挿入されたときに終了します。worker item number in queue(TASK_REACTOR_WORK_ITEMS_NUMBER_IN_QUEUE) - ディスパッチャキューに存在するワークアイテムの数を示します。worker statuses(TASK_REACTOR_WORKER_STATUS) - 各労働者ステータスの労働者数と総労働者数が表示されます。
ログに記録されたすべてのメトリクスイベントを表示するには、次のクエリを使用します。
1種類のメトリクスのみを選択するには、 event.record:name = <メトリクス名> をクエリの where 句に追加します。以下のクエリを使用して、個々のメトリクスをロードできます。
労働者の労働時間 (TASK_REACTOR_WORKER_WORKING_TIME)
労働者のアイドル時間 (TASK_REACTOR_WORKER_IDLE_TIME)
作業員のアイテム待ち時間 (TASK_REACTOR_WORK_ITEM_WAITING_IN_QUEUE_TIME)
キュー内のワーカーアイテム番号 (TASK_REACTOR_WORK_ITEMS_NUMBER_IN_QUEUE)
労働者のステータス (TASK_REACTOR_WORKER_STATUS)