トリガーされるタスク

トリガータスクを使用して、 ストリーム に変更があるたびにタスクを実行します。これにより、新しいデータの可用性が予測できない場合に、ソースを頻繁にポーリングする必要がなくなります。また、データが即座に処理されるため、待ち時間も短縮されます。

トリガーされたタスクは、イベントがトリガーされるまでコンピュート・リソースを使用しません。

考慮事項

  • ディレクトリ・テーブル上でホストされているストリームでは、トリガー・タスクが変更を検出する前に、ディレクトリ・テーブルを更新する必要があります。変更を検出するには、次のいずれかを行います。

  • 外部テーブルとハイブリッド・テーブルのストリームはサポートされていません。

トリガーされたタスクの作成

CREATE TASK を使用し、以下のパラメーターをセットします:

  • WHEN 句を使用してターゲット・ストリームを定義します。(SCHEDULE パラメーターは含めないでください)。

    複数のデータストリームを扱う場合、条件パラメーター WHEN ... AND および WHEN ... OR を使用できます。

  • コンピュートリソースに基づく追加要件:

    • サーバーレスタスクを作成するには、 TARGET_COMPLETION_INTERVAL パラメーターを含める必要があります。 WAREHOUSE パラメーターは含めないでください。Snowflakeは、ターゲット完了間隔を使用して必要なリソースを推定し、この時間内にタスクを完了するように調整します。

    サーバーレスのトリガータスクがSnowflakeでどのように動作するかを示す図。
    • ユーザーが管理するウェアハウスで実行するタスクを作成するには、 WAREHOUSE パラメーターを含め、ウェアハウスを定義します。

既存のタスクをスケジュールタスクからトリガータスクに移行する

  1. タスクを中断します。

  2. SCHEDULE パラメーターの設定を解除し、 WHEN 句を追加してターゲット・ストリームを定義します。

  3. タスクを再開します。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

既存のユーザー管理のトリガータスクをサーバーレスのトリガータスクに移行する

  1. タスクを中断します。

  2. WAREHOUSE パラメーターを削除し、 TARGET_COMPLETION_INTERVAL パラメーターをセットします。

  3. タスクを再開します。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

詳細情報については、 サーバーレスタスク をご参照ください。

トリガータスクの実行許可

トリガータスクを作成すると、タスクはサスペンド状態で開始します。

ストリームの監視を開始します。

タスクは以下の条件で実行されます。

  • トリガーされたタスクを最初にレジュームするとき、タスクは最後にタスクが実行されてからのストリームの変更をチェックします。変更があった場合はタスクが実行され、そうでない場合は、コンピュート・リソースを使わずにタスクをスキップします。

  • タスクが実行中で、ストリームに新しいデータがある場合、タスクは現在のタスクが完了するまで待機します。Snowflakeは、タスクのインスタンスが一度に1つだけ実行されるようにします。

  • タスクが完了すると、Snowflakeはストリームに変更がないか再度チェックします。変更があればタスクは再度実行され、変更がなければタスクはスキップされます。

  • このタスクは、ストリームに新しいデータが検出されるたびに実行されます。

  • ストリーム・データがディレクトリ・テーブル上でホストされている場合、変更を検出するには、以下のいずれかを実行します。

  • 12時間ごとにタスクはヘルスチェックを実行し、ストリームの陳腐化を防ぎます。変更がない場合、Snowflakeはコンピュートリソースを使用せずにタスクをスキップします。ストリームの場合、タスク命令はデータ保持期限が切れる前にストリーム内のデータを消費する必要があります。詳細については、 ストリームが古くなることの回避 をご参照ください。

  • デフォルトでは、トリガータスクは最大で30秒ごとに実行されます。USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS パラメーターを変更することで、より頻繁に、最大10秒ごとに実行することができます。

  • Streams on Views によってタスクがトリガーされると、クエリ内の結合、集約、フィルターに関係なく、Streams on Views クエリによって参照されるテーブルへの変更もタスクのトリガーとなります。

トリガータスクがどのように新しいデータを管理し、12時間ごとに変更をチェックするかを図に示します。

トリガータスクの監視

  • SHOW TASKSDESC TASK 出力では、 SCHEDULE プロパティはトリガータスクのために NULL を表示します。

  • information_schemaスキーマとaccount_usageスキーマのtask_historyビューの出力では、SCHEDULED_FROM列がTRIGGERと表示されます。

例1: ストリームのデータが変更されるたびに実行されるサーバーレスタスクを作成します。

タスクはサーバーレスであるため、 TARGET_COMPLETION_INTERVAL パラメーターは Snowflake が必要なコンピュートリソースを見積もるために必要です。

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

例2:2つのストリームのいずれかでデータが変更されるときに実行されるユーザー管理タスクを作成します。

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

例3: 2つの異なるデータストリームでデータ変更が検出されたときに実行するユーザー管理タスクを作成します。このタスクは AND 条件を使用するため、2つのストリームの一方にしか新しいデータがない場合、タスクはスキップされます。

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

例4: ディレクトリ・テーブルのデータが変更されるたびに実行されるユーザー管理タスクを作成します。こ の例では、 ストリーム (my_directory_table_stream) は、 ステージ (my_test_stage) 上の ディレクトリテーブル でホストされています。

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

トリガーされたタスクを検証するために、ステージにデータが追加されます。

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

その後、ディレクトリテーブルが手動で更新され、タスクがトリガーされます。

ALTER STAGE my_test_stage REFRESH
Copy