トリガーされるタスク

トリガータスクを使用して、 ストリーム に変更があるたびにタスクを実行します。これにより、新しいデータの可用性が予測できない場合に、ソースを頻繁にポーリングする必要がなくなります。また、データが即座に処理されるため、待ち時間も短縮されます。

トリガーされたタスクは、イベントがトリガーされるまでコンピュート・リソースを使用しません。

考慮事項

トリガータスクは以下の項目でサポートされています。

  • テーブル

  • ビュー

  • 動的テーブル

  • Apache Iceberg™ テーブル(管理対象と非管理対象)

  • データ共有

  • ディレクトリテーブル。ディレクトリテーブルは、トリガータスクが変更を検出できるようにする前に、リフレッシュする必要があります。変更を検出するには、次のいずれかのタスクを実行できます。

トリガータスクは以下の項目ではサポートされていません。

  • ハイブリッドテーブル

  • 外部テーブルのストリーム

コンシューマーが共有テーブルやセキュアビューに対してストリームを作成できるようにするには、データプロバイダーが自分のアカウント内で共有対象となるテーブルやビューに対して、変更追跡を有効にする必要があります。つまり ALTERVIEW<view_name>SETCHANGE_TRACKING = TRUE;`です。変更追跡が有効になっていない場合、コンシューマーは共有データに対してストリームを作成できません。詳細については、 :ref:`label-data_sharing_streams をご参照ください。

トリガーされたタスクの作成

CREATE TASK を使用し、以下のパラメーターをセットします:

  • WHEN 句を使用してターゲット・ストリームを定義します。(SCHEDULE パラメーターは含めないでください)。

  • コンピュートリソースに基づく追加要件:

    • ユーザーが管理するウェアハウスで実行するタスクを作成するには、 WAREHOUSE パラメーターを含め、ウェアハウスを定義します。

    • サーバーレスタスクを作成するには、 TARGET_COMPLETION_INTERVAL パラメーターを含める必要があります。WAREHOUSE パラメーターは含めないでください。Snowflakeは、ターゲット完了間隔を使用して必要なリソースを推定し、この時間内にタスクを完了するように調整します。

次の例では、ストリーム内のデータが変更されるたびに実行される、サーバーレストリガータスクを作成します。

サーバーレストリガータスクを示す図
CREATE TASK my_triggered_task
  TARGET_COMPLETION_INTERVAL='15 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

既存のタスクをスケジュールタスクからトリガータスクに移行する

  1. タスクを中断します。

  2. タスクを更新するには、ALTER TASK を使用してください。SCHEDULE パラメータの設定を解除し、WHEN 句を追加してターゲットストリームを定義してください。

  3. タスクを再開します。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

既存のユーザー管理のトリガータスクをサーバーレスのトリガータスクに移行する

  1. タスクを中断します。

  2. タスクを更新するには、ALTER TASK、 を使用してください。WAREHOUSE パラメーターを削除してから、TARGET_COMPLETION_INTERVAL パラメーターを設定してください。

  3. タスクを再開します。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

詳細情報については、 サーバーレスタスク をご参照ください。

トリガータスクの実行を許可する

トリガータスクを作成すると、タスクはサスペンド状態で開始します。

ストリームの監視を開始します。

タスクは以下の条件で実行されます。

  • トリガータスクを初めて再開すると、タスクは最後のタスクの実行後にストリームに変更がないか確認します。変更がある場合、タスクが実行されます。そうでない場合は、コンピューティングリソースを使用せずにタスクをスキップします。

  • タスクが実行中で、ストリームに新しいデータがある場合、タスクは現在のタスクが完了するまで一時停止します。Snowflakeはタスクのうち1つのインスタンスのみが一度に実行されるようにします。

  • タスクが完了すると、Snowflakeはストリームに変更がないか再度チェックします。変更があればタスクは再度実行され、変更がなければタスクはスキップされます。

  • このタスクは、ストリームに新しいデータが検出されるたびに実行されます。

  • ストリームデータがディレクトリテーブルでホストされている場合は、次のタスクのいずれかを実行して変更を検出します。

  • 12時間ごとにタスクはヘルスチェックを実行し、ストリームの陳腐化を防ぎます。変更がない場合、Snowflakeはコンピュートリソースを使用せずにタスクをスキップします。ストリームの場合、タスク命令はデータ保持期限が切れる前にストリーム内のデータを消費する必要があります。詳細については、 ストリームが古くなることの回避 をご参照ください。

  • デフォルトでは、トリガータスクは最大で30秒ごとに実行されます。USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS パラメーターを変更することで、より頻繁に、最大10秒ごとに実行することができます。

  • Streams on Views によってタスクがトリガーされると、クエリ内の結合、集約、フィルターに関係なく、Streams on Views クエリによって参照されるテーブルへの変更もタスクのトリガーとなります。

トリガータスクがどのように新しいデータを管理し、12時間ごとに変更をチェックするかを図に示します。

トリガータスクの監視

  • SHOW TASKSDESC TASK 出力では、 SCHEDULE プロパティはトリガータスクのために NULL を表示します。

  • information_schemaスキーマとaccount_usageスキーマのtask_historyビューの出力では、SCHEDULED_FROM列がTRIGGERと表示されます。

例1: 2つのストリームのいずれかでデータが変更されるたびに実行されるユーザー管理タスクを作成します。

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

例2:2つの異なるデータストリームでデータの変更が検出されるたびに実行されるユーザー管理タスクを作成します。このタスクは AND 条件を使用するため、2つのストリームの一方にしか新しいデータがない場合、タスクはスキップされます。

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

例3:ディレクトリテーブル内のデータが変更されるたびに実行されるユーザー管理タスクを作成します。この例では、ストリーム --- my_directory_table_stream --- が、ステージmy_test_stage 上の :doc:` ディレクトリテーブル </user-guide/data-load-dirtables-manage>` でホストされています。

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

トリガーされたタスクを検証するために、ステージにデータが追加されます。

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

その後、ディレクトリテーブルが手動で更新され、タスクがトリガーされます。

ALTER STAGE my_test_stage REFRESH
Copy