トリガーされるタスク¶
トリガータスクを使用して、 ストリーム に変更があるたびにタスクを実行します。これにより、新しいデータの可用性が予測できない場合に、ソースを頻繁にポーリングする必要がなくなります。また、データが即座に処理されるため、待ち時間も短縮されます。
トリガーされたタスクは、イベントがトリガーされるまでコンピュート・リソースを使用しません。
考慮事項¶
ディレクトリ・テーブル上でホストされているストリームでは、トリガー・タスクが変更を検出する前に、ディレクトリ・テーブルを更新する必要があります。変更を検出するには、次のいずれかを行います。
ディレクトリテーブルを自動リフレッシュする ようにセットします。
ALTER STAGE name REFRESH コマンドを使用して、ディレクトリ・テーブルを手動で更新します。
外部テーブルとハイブリッド・テーブルのストリームはサポートされていません。
トリガーされたタスクの作成¶
CREATE TASK を使用し、以下のパラメーターをセットします:
WHEN
句を使用してターゲット・ストリームを定義します。(SCHEDULE
パラメーターは含めないでください)。複数のデータストリームを扱う場合、条件パラメーター
WHEN ... AND
およびWHEN ... OR
を使用できます。-
サーバーレスタスクを作成するには、
TARGET_COMPLETION_INTERVAL
パラメーターを含める必要があります。WAREHOUSE
パラメーターは含めないでください。Snowflakeは、ターゲット完了間隔を使用して必要なリソースを推定し、この時間内にタスクを完了するように調整します。
ユーザーが管理するウェアハウスで実行するタスクを作成するには、
WAREHOUSE
パラメーターを含め、ウェアハウスを定義します。
既存のタスクをスケジュールタスクからトリガータスクに移行する¶
タスクを中断します。
SCHEDULE
パラメーターの設定を解除し、WHEN
句を追加してターゲット・ストリームを定義します。タスクを再開します。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
既存のユーザー管理のトリガータスクをサーバーレスのトリガータスクに移行する¶
タスクを中断します。
WAREHOUSE
パラメーターを削除し、TARGET_COMPLETION_INTERVAL
パラメーターをセットします。タスクを再開します。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
詳細情報については、 サーバーレスタスク をご参照ください。
トリガータスクの実行許可¶
トリガータスクを作成すると、タスクはサスペンド状態で開始します。
ストリームの監視を開始します。
ALTER TASK ... RESUME を使ってタスクを再開。
タスクは以下の条件で実行されます。
トリガーされたタスクを最初にレジュームするとき、タスクは最後にタスクが実行されてからのストリームの変更をチェックします。変更があった場合はタスクが実行され、そうでない場合は、コンピュート・リソースを使わずにタスクをスキップします。
タスクが実行中で、ストリームに新しいデータがある場合、タスクは現在のタスクが完了するまで待機します。Snowflakeは、タスクのインスタンスが一度に1つだけ実行されるようにします。
タスクが完了すると、Snowflakeはストリームに変更がないか再度チェックします。変更があればタスクは再度実行され、変更がなければタスクはスキップされます。
このタスクは、ストリームに新しいデータが検出されるたびに実行されます。
ストリーム・データがディレクトリ・テーブル上でホストされている場合、変更を検出するには、以下のいずれかを実行します。
12時間ごとにタスクはヘルスチェックを実行し、ストリームの陳腐化を防ぎます。変更がない場合、Snowflakeはコンピュートリソースを使用せずにタスクをスキップします。ストリームの場合、タスク命令はデータ保持期限が切れる前にストリーム内のデータを消費する必要があります。詳細については、 ストリームが古くなることの回避 をご参照ください。
デフォルトでは、トリガータスクは最大で30秒ごとに実行されます。USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS パラメーターを変更することで、より頻繁に、最大10秒ごとに実行することができます。
Streams on Views によってタスクがトリガーされると、クエリ内の結合、集約、フィルターに関係なく、Streams on Views クエリによって参照されるテーブルへの変更もタスクのトリガーとなります。
トリガータスクの監視¶
SHOW TASKS
とDESC TASK
出力では、SCHEDULE
プロパティはトリガータスクのためにNULL
を表示します。information_schemaスキーマとaccount_usageスキーマのtask_historyビューの出力では、SCHEDULED_FROM列がTRIGGERと表示されます。
例¶
例1: ストリームのデータが変更されるたびに実行されるサーバーレスタスクを作成します。
タスクはサーバーレスであるため、 TARGET_COMPLETION_INTERVAL
パラメーターは Snowflake が必要なコンピュートリソースを見積もるために必要です。
CREATE TASK my_task
TARGET_COMPLETION_INTERVAL='120 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS SELECT 1;
例2:2つのストリームのいずれかでデータが変更されるときに実行されるユーザー管理タスクを作成します。
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
例3: 2つの異なるデータストリームでデータ変更が検出されたときに実行するユーザー管理タスクを作成します。このタスクは AND 条件を使用するため、2つのストリームの一方にしか新しいデータがない場合、タスクはスキップされます。
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
例4: ディレクトリ・テーブルのデータが変更されるたびに実行されるユーザー管理タスクを作成します。こ の例では、 ストリーム (my_directory_table_stream) は、 ステージ (my_test_stage) 上の ディレクトリテーブル でホストされています。
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
トリガーされたタスクを検証するために、ステージにデータが追加されます。
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
その後、ディレクトリテーブルが手動で更新され、タスクがトリガーされます。
ALTER STAGE my_test_stage REFRESH