タスクリアクター

すべてのSnowflakeコネクタで使用される共通の要素と機能を提供するライブラリ。

要件

タスクリアクターでは、Native Appのインストール中に少なくとも次のSQLファイルを実行する必要があります。

概要

タスクリアクターは、限られたタスクセットを持つキュー内に保存されたワークチャンクのオーケストレーションメカニズムを提供する別のモジュールです。タスクリアクターのキューとディスパッチャーは、 Snowflakeタスク を使用した Snowflakeストリーム に基づいており、リフレッシュ時間の制限により1分ごとにトリガーされます。ウェアハウスがクレジットを節約できるように、タスクリアクターは入力キューにデータがある場合にのみアクティブになります。

タスクリアクターは、キュー、ディスパッチャー、ワーカーの3つの主要コンポーネントで構成されます。

  1. コネクタアプリケーションは、キューに QueueItems を追加します。

  2. ディスパッチャー(Snowflakeタスク)は、1分ごとに待機中の QueueItems をキューから取得し、ワーカーに渡します。

  3. ワーカー(Snowflakeタスク)は、1分ごとに割り当てられた QueueItems で並行して作業します。

コネクタ構成が確定すると、タスクリアクターの構成は次の3ステップに制限されます。

  1. タスクリアクターのすべてのコンポーネントを作成する

  2. インスタンスの初期化

  3. (オプション)ワーカー数の変更

タスクリアクターのすべてのコンポーネントを作成する

インスタンスオブジェクトを作成するには、まず workerselector、そして必要に応じて expired selector の実装を作成し、 次に TASK_REACTOR.CREATE_INSTANCE_OBJECTS プロシージャを使用してそれらを統合する必要があります。

ワーカーの実装

ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。唯一必須なのは、ジョブを開始する特定のワーカーメソッドを持つことです。このメソッドは、Snowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。

  • session - Snowparkセッションオブジェクト

  • worker_id - 数値、ユニークワーカーID

  • task_reactor_schema - タスクリアクターオブジェクトが作成されるスキーマ名。タスクリアクターのインスタンス名として使用できます。

ワーカーは、特定のデータの取得や取り込みなど、ディスパッチャーによって割り当てられたタスクを実行する責任を負います。(com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker および com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion)Javaクラスを使用するか、より単純なタスク(com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker および com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask)を使用することをお勧めしますが、ストアドプロシージャのハンドラーの作成がサポートされている任意のプログラミング言語でワーカーを作成できます。

Javaワーカーメソッドの例:

public static String executeWork(Session session, int workerId, String taskReactorSchema) {
  FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
  WorkerId workerIdentifier = new WorkerId(workerId);
  Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
  try {
    IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
  } catch (WorkerException e) {
    // handle the exception...
    throw new RuntimeException(e);
  }
  return "Worker procedure executed.";
}
Copy

すでに作成されたワーカーメソッドを使用する場合、ユーザーはそれを CONNECTOR.WORKER_PROCEDURE に統合する必要があります。プロシージャは独自のワーカーメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 STRING を返し、次のパラメーターを含める必要があります。

  • worker_id - 数値

  • task_reactor_schema - 文字列

ワーカーのJava実装を呼び出すプロシージャの例:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
Copy

イベントテーブルにログ記録されるメトリクスを収集するには、テレメトリライブラリが必要です。

セレクターの実装

セレクターの役割は、キューに入れられたタスクのうち、どのタスクをタスクリアクターで処理するかを決定することです。ワーカーの実装と同様に、Snowparkでサポートされている任意の言語で作成できます。タスクセレクターは、データベースプロシージャまたはデータベースビューとして実装できます。セレクター(プロシージャまたはビュー)は、 TASK_REACTOR.CREATE_NEW_INSTANCE プロシージャの引数として渡す必要があります。

プロシージャはSnowparkプロシージャから呼び出し可能で、文字列を返し、次のパラメーターを含んでいる必要があります。

  • session - Snowparkセッション

  • queueItems - String[](個々の JSON 文字列の配列。各文字列は単一の QueueItem を表します)

Javaセレクターメソッドの例:

public static String selectWork(Session session, String[] queueItems) {
  Variant[] sorted =
      Arrays.stream(queueItems)
          .map(Variant::new)
          .filter(
              queueItem ->
                  !queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
          .sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
          .toArray(Variant[]::new);
  return new Variant(sorted).asJsonString();
}
Copy

セレクターメソッドの代わりに、既存のキューからタスクをフィルタリングおよびソートするビューを作成することもできます。ディスパッチャーは、次のサンプルクエリを使用して、新しく作成されたビューから新しいタスクを取得できます。

CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Copy

ユーザーは、すでに作成されているセレクターメソッドを使用して CONNECTOR.WORK_SELECTOR に統合する必要があります。このプロシージャーは、必須のワークセレクターメソッドを呼び出す必要があります。これはアプリケーションスキーマ内に作成され、 ARRAY を返し、次のパラメーターを含める必要があります。

  • work_items - array

ワークセレクターのJava実装を呼び出すプロシージャの例:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
    RETURNS ARRAY
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Copy

期限切れセレクターの実装

期限切れセレクターの役割は、タスクリアクターのキューから削除するキューアイテムを決定することです。セレクターが一部のアイテムに到達できず、これらのアイテムがキュー内に永久に残るため、アイテムを削除する必要がある場合があります。さらに、キューで待機している一部のアイテムは、かなり前に作成された可能性があり、それ以上処理が必要ないものもあります。期限切れセレクターは、データベースビューとして実装できます。セレクタービューは、 TASK_REACTOR.CREATE_NEW_INSTANCE プロシージャの引数として渡す必要があります。キューからアイテムを削除する必要がない場合は、デフォルトの実装である TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR を使用できます。

次のクエリを使用すると、3日以上前に作成されたアイテムを選択する期限切れセレクタービューを作成できます。

CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
    AS SELECT * FROM TASK_REACTOR.QUEUE q
        WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Copy

インスタンスオブジェクトの統合

TASK_REACTOR.CREATE_INSTANCE_OBJECTS を使用すると、作成されたインスタンスを初期化する前に、すべてのインスタンスをまとめて構成できます。このプロシージャはスキーマごとに1回だけ実行できるため、今後の呼び出しでは変更は行われません。プロシージャが何度も実行されたり、まったく呼び出されなかったりするのを防ぐために、 setup.sql ファイルに初期化コールを置くことをお勧めします。

必須パラメーター:

  • instance_schema_name VARCHAR - インスタンスが動作するデータベースオブジェクトを格納する、インスタンスごとに一意のスキーマ。

  • worker_procedure_name VARCHAR - Worker Implementation の部分に記述されているワーカープロシージャの名前。

  • work_selector_type VARCHAR - - 新しいタスクで、ビューまたはプロシージャのどちらを使用するかを指定する値。可能な値: VIEW、 PROCEDURE。

  • work_selector_name VARCHAR - Selector Implementation - 部分に記述されているセレクタープロシージャまたはビューの名前。

オプションのパラメーター:

  • expired_work_selector_name VARCHAR - Expired Selector Implementation - 部分に記述されている期限切れのセレクタービューの名前。値が提供されていない場合、 TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR は何も返さないデフォルトの実装として使用されます。

インスタンスの初期化

タスクリアクター内のすべての構成を初期化して実行するには、ユーザーは INITIALIZE_INSTANCE を呼び出す必要があります。このプロシージャは、次のパラメーターを入力として受け取ります。

  • instance_schema_name - (必須)インスタンスが動作するデータベースオブジェクトを格納するスキーマの名前。

  • warehouse_name (必須)インスタンスが実行されるウェアハウス名。

  • dt_should_be_started (オプション) - デフォルト: TRUE。新しいインスタンスを作成するときに、ディスパッチャータスクを開始する必要があります。

  • dt_task_schedule (オプション) - デフォルト: 1 MINUTE。ディスパッチャータスクの実行頻度。

  • dt_allow_overlapping_execution (オプション) - デフォルト: FALSE。DAG を同時に実行できるようにします。

  • dt_user_task_timeout_ms (オプション) - タスクがタイムアウトするまでの1回の実行の制限時間(ミリ秒単位)を指定します。

注釈

ワーカープロシージャの実行時間がワーカータスク(USER_TASK_TIMEOUT_MS)に設定されたタイムアウト時間よりも長くなると、プロシージャはタイムアウトエラーで中止されます。Snowflakeタスクのタイムアウトを超えないようにタスクをスケジュールすることが重要です。

必要なパラメーターの最小数を指定すると、 Task Reactor は指定された構成で初期化され、 TASK_REACTOR.DISPATCHER プロシージャを使用してワーカーをディスパッチします。

ワーカー数を設定する

ワーカーの数は、次のパラメーターを使用して、 TASK_REACTOR.SET_WORKERS_NUMBER プロシージャを呼び出すことによって手動で変更できます。

  • WORKERS_NUMBER - 新しいワーカー数。

  • TR_INSTANCE_SCHEMA_NAME - インスタンススキーマ名

メトリック

タスクリアクターにはメトリクスメカニズムがあります。これは、 Snowflakeトレースイベント に基づいています。メトリクスはイベントテーブルにログ記録されるため、メトリクスを動作させるにはイベントテーブルを有効にする必要があります。

現在、次のメトリクスが導入されています。

  • worker working timeTASK_REACTOR_WORKER_WORKING_TIME) - ワーカーが実際にリソースを処理していた時間を示します。タイマーはワーカータスクの開始時に開始され、ワーカータスクの終了時に終了します。

  • worker idle timeTASK_REACTOR_WORKER_IDLE_TIME) - worker working time の反対です。ワーカーがスリープ状態だった時間(新しい作業を待機中か、タスクの次のスケジュールを待機中)を示します。タイマーは、ワーカーがタスクを完了すると開始され、ワーカーのタスクが再開されると終了します。

ログに記録されたすべてのメトリクスイベントを表示するには、次のクエリを使用します。

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
    ORDER BY event.timestamp DESC;
Copy

1種類のメトリクスのみを選択するには、 event.record:name = <メトリクス名> をクエリの where 句に追加します。

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
        AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
    ORDER BY event.timestamp DESC;
Copy