トリガーされるタスク¶
トリガータスクを使用して、 ストリーム に変更があるたびにタスクを実行します。これにより、新しいデータの可用性が予測できない場合に、ソースを頻繁にポーリングする必要がなくなります。また、データが即座に処理されるため、待ち時間も短縮されます。
トリガーされたタスクは、イベントがトリガーされるまでコンピュート・リソースを使用しません。
考慮事項¶
トリガータスクは以下の項目でサポートされています。
テーブル
ビュー
動的テーブル
Apache Iceberg™ テーブル(管理対象と非管理対象)
データ共有
ディレクトリテーブル。ディレクトリテーブルは、トリガータスクが変更を検出できるようにする前に、リフレッシュする必要があります。変更を検出するには、次のいずれかのタスクを実行できます。
ディレクトリテーブルを自動リフレッシュする ようにセットします。
ALTER STAGE name REFRESH コマンドを使用して、ディレクトリテーブルを手動で更新します。
トリガータスクは以下の項目ではサポートされていません。
ハイブリッドテーブル
外部テーブルのストリーム
コンシューマーが共有テーブルやセキュアビューに対してストリームを作成できるようにするには、データプロバイダーが自分のアカウント内で共有対象となるテーブルやビューに対して、変更追跡を有効にする必要があります。つまり ALTERVIEW<view_name>SETCHANGE_TRACKING = TRUE;`です。変更追跡が有効になっていない場合、コンシューマーは共有データに対してストリームを作成できません。詳細については、 :ref:`label-data_sharing_streams
をご参照ください。
トリガーされたタスクの作成¶
CREATE TASK を使用し、以下のパラメーターをセットします:
WHEN
句を使用してターゲット・ストリームを定義します。(SCHEDULE
パラメーターは含めないでください)。-
ユーザーが管理するウェアハウスで実行するタスクを作成するには、
WAREHOUSE
パラメーターを含め、ウェアハウスを定義します。サーバーレスタスクを作成するには、
TARGET_COMPLETION_INTERVAL
パラメーターを含める必要があります。WAREHOUSE
パラメーターは含めないでください。Snowflakeは、ターゲット完了間隔を使用して必要なリソースを推定し、この時間内にタスクを完了するように調整します。
次の例では、ストリーム内のデータが変更されるたびに実行される、サーバーレストリガータスクを作成します。
CREATE TASK my_triggered_task
TARGET_COMPLETION_INTERVAL='15 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
既存のタスクをスケジュールタスクからトリガータスクに移行する¶
タスクを中断します。
タスクを更新するには、ALTER TASK を使用してください。
SCHEDULE
パラメータの設定を解除し、WHEN
句を追加してターゲットストリームを定義してください。タスクを再開します。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
既存のユーザー管理のトリガータスクをサーバーレスのトリガータスクに移行する¶
タスクを中断します。
タスクを更新するには、ALTER TASK、 を使用してください。
WAREHOUSE
パラメーターを削除してから、TARGET_COMPLETION_INTERVAL
パラメーターを設定してください。タスクを再開します。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
詳細情報については、 サーバーレスタスク をご参照ください。
トリガータスクの実行を許可する¶
トリガータスクを作成すると、タスクはサスペンド状態で開始します。
ストリームの監視を開始します。
ALTER TASK ... RESUME を使ってタスクを再開。
タスクは以下の条件で実行されます。
トリガータスクを初めて再開すると、タスクは最後のタスクの実行後にストリームに変更がないか確認します。変更がある場合、タスクが実行されます。そうでない場合は、コンピューティングリソースを使用せずにタスクをスキップします。
タスクが実行中で、ストリームに新しいデータがある場合、タスクは現在のタスクが完了するまで一時停止します。Snowflakeはタスクのうち1つのインスタンスのみが一度に実行されるようにします。
タスクが完了すると、Snowflakeはストリームに変更がないか再度チェックします。変更があればタスクは再度実行され、変更がなければタスクはスキップされます。
このタスクは、ストリームに新しいデータが検出されるたびに実行されます。
ストリームデータがディレクトリテーブルでホストされている場合は、次のタスクのいずれかを実行して変更を検出します。
12時間ごとにタスクはヘルスチェックを実行し、ストリームの陳腐化を防ぎます。変更がない場合、Snowflakeはコンピュートリソースを使用せずにタスクをスキップします。ストリームの場合、タスク命令はデータ保持期限が切れる前にストリーム内のデータを消費する必要があります。詳細については、 ストリームが古くなることの回避 をご参照ください。
デフォルトでは、トリガータスクは最大で30秒ごとに実行されます。USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS パラメーターを変更することで、より頻繁に、最大10秒ごとに実行することができます。
Streams on Views によってタスクがトリガーされると、クエリ内の結合、集約、フィルターに関係なく、Streams on Views クエリによって参照されるテーブルへの変更もタスクのトリガーとなります。
トリガータスクの監視¶
SHOW TASKS
とDESC TASK
出力では、SCHEDULE
プロパティはトリガータスクのためにNULL
を表示します。information_schemaスキーマとaccount_usageスキーマのtask_historyビューの出力では、SCHEDULED_FROM列がTRIGGERと表示されます。
例¶
例1: 2つのストリームのいずれかでデータが変更されるたびに実行されるユーザー管理タスクを作成します。
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
例2:2つの異なるデータストリームでデータの変更が検出されるたびに実行されるユーザー管理タスクを作成します。このタスクは AND 条件を使用するため、2つのストリームの一方にしか新しいデータがない場合、タスクはスキップされます。
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
例3:ディレクトリテーブル内のデータが変更されるたびに実行されるユーザー管理タスクを作成します。この例では、ストリーム --- my_directory_table_stream --- が、ステージmy_test_stage 上の :doc:` ディレクトリテーブル </user-guide/data-load-dirtables-manage>` でホストされています。
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
トリガーされたタスクを検証するために、ステージにデータが追加されます。
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
その後、ディレクトリテーブルが手動で更新され、タスクがトリガーされます。
ALTER STAGE my_test_stage REFRESH