タスクリアクター¶
すべてのSnowflakeコネクタで使用される共通の要素と機能を提供するライブラリ。
要件¶
タスクリアクターでは、Native Appのインストール中に少なくとも次のSQLファイルを実行する必要があります。
task_reactor.sql
(タスクリアクター SQL リファレンス を参照)
概要¶
タスクリアクターは、限られたタスクセットを持つキュー内に保存されたワークチャンクのオーケストレーションメカニズムを提供する別のモジュールです。タスクリアクターのキューとディスパッチャーは、 Snowflakeタスク を使用した Snowflakeストリーム に基づいており、リフレッシュ時間の制限により1分ごとにトリガーされます。ウェアハウスがクレジットを節約できるように、タスクリアクターは入力キューにデータがある場合にのみアクティブになります。
タスクリアクターは、キュー、ディスパッチャー、ワーカーの3つの主要コンポーネントで構成されます。
コネクタアプリケーションは、キューに QueueItems を追加します。
ディスパッチャー(Snowflakeタスク)は、1分ごとに待機中の QueueItems をキューから取得し、ワーカーに渡します。
ワーカー(Snowflakeタスク)は、1分ごとに割り当てられた QueueItems で並行して作業します。
コネクタ構成が確定すると、タスクリアクターの構成は次の3ステップに制限されます。
タスクリアクターのすべてのコンポーネントを作成する
インスタンスの初期化
(オプション)ワーカー数の変更
タスクリアクターのすべてのコンポーネントを作成する¶
インスタンスオブジェクトを作成するには、まず worker
、 selector
、そして必要に応じて expired selector
の実装を作成し、 次に TASK_REACTOR.CREATE_INSTANCE_OBJECTS プロシージャを使用してそれらを統合する必要があります。
ワーカーの実装¶
ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。唯一必須なのは、ジョブを開始する特定のワーカーメソッドを持つことです。このメソッドは、Snowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。
session
- Snowparkセッションオブジェクトworker_id
- 数値、ユニークワーカーIDtask_reactor_schema
- タスクリアクターオブジェクトが作成されるスキーマ名。タスクリアクターのインスタンス名として使用できます。
ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。(com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker
および com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion
)Javaクラスを使用するか、より単純なタスク(com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker
および com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask
)を使用することをお勧めしますが、ストアドプロシージャのハンドラーの作成がサポートされている任意のプログラミング言語でワーカーを作成できます。
Javaワーカーメソッドの例:
public static String executeWork(Session session, int workerId, String taskReactorSchema) {
FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
WorkerId workerIdentifier = new WorkerId(workerId);
Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
try {
IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
} catch (WorkerException e) {
// handle the exception...
throw new RuntimeException(e);
}
return "Worker procedure executed.";
}
すでに作成されたワーカーメソッドを使用する場合、ユーザーはそれを CONNECTOR.WORKER_PROCEDURE
に統合する必要があります。プロシージャは独自のワーカーメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 STRING を返し、次のパラメーターを含める必要があります。
worker_id
- 数値task_reactor_schema
- 文字列
ワーカーのJava実装を呼び出すプロシージャの例:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
RETURNS STRING
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
イベントテーブルにログ記録されるメトリクスを収集するには、テレメトリライブラリが必要です。
セレクターの実装¶
セレクターの役割は、キューに入れられたタスクのうち、どのタスクをタスクリアクターで処理するかを決定することです。ワーカーの実装と同様に、Snowparkでサポートされている任意の言語で作成できます。タスクセレクターは、データベースプロシージャまたはデータベースビューとして実装できます。セレクター(プロシージャまたはビュー)は、 TASK_REACTOR.CREATE_NEW_INSTANCE
プロシージャの引数として渡す必要があります。
プロシージャはSnowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。
session
- SnowparkセッションqueueItems
- String[](個々の JSON 文字列の配列。各文字列は単一の QueueItem を表します)
Javaセレクターメソッドの例:
public static String selectWork(Session session, String[] queueItems) {
Variant[] sorted =
Arrays.stream(queueItems)
.map(Variant::new)
.filter(
queueItem ->
!queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
.sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
.toArray(Variant[]::new);
return new Variant(sorted).asJsonString();
}
セレクターメソッドの代わりに、既存のキューからタスクをフィルタリングおよびソートするビューを作成することもできます。ディスパッチャーは、次のサンプルクエリを使用して、新しく作成されたビューから新しいタスクを取得できます。
CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
ユーザーは、すでに作成されているセレクターメソッドを使用して CONNECTOR.WORK_SELECTOR
に統合する必要があります。このプロシージャーは、必須のワークセレクターメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 ARRAY を返し、次のパラメーターを含める必要があります。
work_items - array
ワークセレクターのJava実装を呼び出すプロシージャの例:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
RETURNS ARRAY
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
期限切れセレクターの実装¶
期限切れセレクターの役割は、タスクリアクターのキューから削除するキューアイテムを決定することです。セレクターが一部のアイテムに到達できず、これらのアイテムがキュー内に永久に残るため、アイテムを削除する必要がある場合があります。さらに、キューで待機している一部のアイテムは、かなり前に作成された可能性があり、それ以上処理が必要ないものもあります。期限切れセレクターは、データベースビューとして実装できます。セレクタービューは、 TASK_REACTOR.CREATE_NEW_INSTANCE
プロシージャの引数として渡す必要があります。キューからアイテムを削除する必要がない場合は、デフォルトの実装である TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
を使用できます。
次のクエリを使用すると、3日以上前に作成されたアイテムを選択する期限切れセレクタービューを作成できます。
CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
AS SELECT * FROM TASK_REACTOR.QUEUE q
WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
インスタンスオブジェクトの統合¶
TASK_REACTOR.CREATE_INSTANCE_OBJECTS を使用すると、作成されたインスタンスを初期化する前に、すべてのインスタンスをまとめて構成できます。このプロシージャはスキーマごとに1回だけ実行できるため、今後の呼び出しでは変更は行われません。プロシージャが何度も実行されたり、まったく呼び出されなかったりするのを防ぐために、 setup.sql
ファイルに初期化コールを置くことをお勧めします。
必須パラメーター:
instance_schema_name VARCHAR
- インスタンスが動作するデータベースオブジェクトを格納する、インスタンスごとに一意のスキーマ。worker_procedure_name VARCHAR
-Worker Implementation
の部分に記述されているワーカープロシージャの名前。work_selector_type VARCHAR
- - 新しいタスクで、ビューまたはプロシージャのどちらを使用するかを指定する値。可能な値: VIEW、 PROCEDURE。work_selector_name VARCHAR
-Selector Implementation
- 部分に記述されているセレクタープロシージャまたはビューの名前。
オプションのパラメーター:
expired_work_selector_name VARCHAR
-Expired Selector Implementation
- 部分に記述されている期限切れのセレクタービューの名前。値が提供されていない場合、TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
は何も返さないデフォルトの実装として使用されます。
インスタンスの初期化¶
タスクリアクター内のすべての構成を初期化して実行するには、ユーザーは INITIALIZE_INSTANCE
を呼び出す必要があります。このプロシージャは、次のパラメーターを入力として受け取ります。
instance_schema_name
- (必須)インスタンスが動作するデータベースオブジェクトを格納するスキーマの名前。warehouse_name
(必須)インスタンスが実行されるウェアハウス名。dt_should_be_started
(オプション) - デフォルト:TRUE
。新しいインスタンスを作成するときに、ディスパッチャータスクを開始する必要があります。dt_task_schedule
(オプション) - デフォルト:1 MINUTE
。ディスパッチャータスクの実行頻度。dt_allow_overlapping_execution
(オプション) - デフォルト:FALSE
。DAG を同時に実行できるようにします。dt_user_task_timeout_ms
(オプション) - タスクがタイムアウトするまでの1回の実行の制限時間(ミリ秒単位)を指定します。
注釈
ワーカープロシージャの実行時間がワーカータスク(USER_TASK_TIMEOUT_MS)に設定されたタイムアウト時間よりも長くなると、プロシージャはタイムアウトエラーで中止されます。Snowflakeタスクのタイムアウトを超えないようにタスクをスケジュールすることが重要です。
必要なパラメーターの最小数を指定すると、 Task Reactor
は指定された構成で初期化され、 TASK_REACTOR.DISPATCHER
プロシージャを使用してワーカーをディスパッチします。
ワーカー数を設定する¶
ワーカーの数は、次のパラメーターを使用して、 TASK_REACTOR.SET_WORKERS_NUMBER プロシージャを呼び出すことによって手動で変更できます。
WORKERS_NUMBER
- 新しいワーカー数。TR_INSTANCE_SCHEMA_NAME
- インスタンススキーマ名
メトリック¶
タスクリアクターにはメトリクスメカニズムがあります。これは、 Snowflakeトレースイベント に基づいています。メトリクスはイベントテーブルにログ記録されるため、メトリクスを動作させるにはイベントテーブルを有効にする必要があります。
現在、次のメトリクスが導入されています。
worker working time
(TASK_REACTOR_WORKER_WORKING_TIME
) - ワーカーが実際にリソースを処理していた時間を示します。タイマーはワーカータスクの開始時に開始され、ワーカータスクの終了時に終了します。worker idle time
(TASK_REACTOR_WORKER_IDLE_TIME
) -worker working time
の反対です。ワーカーがスリープ状態だった時間(新しい作業を待機中か、タスクの次のスケジュールを待機中)を示します。タイマーは、ワーカーがタスクを完了すると開始され、ワーカーのタスクが再開されると終了します。
ログに記録されたすべてのメトリクスイベントを表示するには、次のクエリを使用します。
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
ORDER BY event.timestamp DESC;
1種類のメトリクスのみを選択するには、 event.record:name = <メトリクス名>
をクエリの where
句に追加します。
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
ORDER BY event.timestamp DESC;