Triggered tasks¶
Use triggered tasks to run tasks whenever there’s a change in a stream. This eliminates the need to poll a source frequently when the availability of new data is unpredictable. It also reduces latency because data is processed immediately.
Triggered tasks don’t use compute resources until the event is triggered.
Considerations¶
For streams hosted on directory tables, the directory table needs to be refreshed before a triggered task can detect the changes. To detect changes, you can do either of the following:
Set the directory table to auto-refresh.
Refresh the directory table manually using the ALTER STAGE name REFRESH command.
Streams on external tables and hybrid tables are not supported.
Create a triggered task¶
Use CREATE TASK, and set the following parameters:
Define the target stream using the
WHEN
clause. (Do not include theSCHEDULE
parameter.)When working with multiple data streams, you can use conditional parameters:
WHEN ... AND
andWHEN ... OR
.Additional requirements based on compute resources:
To create a serverless task, you must include the
TARGET_COMPLETION_INTERVAL
parameter. Do not include theWAREHOUSE
parameter. Snowflake estimates the resources needed using the target completion interval, and adjusts to complete the task in this time.
To create a task that runs on a user-managed warehouse, include the
WAREHOUSE
parameter and define the warehouse.
Migrate an existing task from a scheduled task to a triggered task¶
Suspend the task.
Unset the
SCHEDULE
parameter, and add theWHEN
clause to define the target stream.Resume the task.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Migrate an existing user-managed triggered task to a serverless triggered task¶
Suspend the task.
Remove the
WAREHOUSE
parameter, and set theTARGET_COMPLETION_INTERVAL
parameter.Resume the task.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
For more information, see serverless tasks.
Allowing the triggered task to run¶
When you create a triggered task, it starts in the suspended state.
To begin monitoring the stream:
Resume the task using ALTER TASK … RESUME.
The task runs in the following conditions:
When you first resume a triggered task, the task checks the stream for changes since the last task was run. If there’s been a change, the task runs; otherwise, it skips the task without using compute resources.
If a task is running and the stream has new data, the task waits until the current task is complete. Snowflake ensures only one instance of a task is executed at a time.
After a task is complete, Snowflake checks for changes in the stream again. If there are changes, the task runs again; if not, it skips the task.
The task runs whenever new data is detected in the stream.
If the stream data is hosted on a directory table, then to detect changes, you can do either of the following:
Every 12 hours, the task runs a health check to prevent streams from becoming stale. If there are no changes, Snowflake skips the task without using compute resources. For streams, the task instructions must consume the data in the stream before the data retention expires; if they don’t, the stream becomes stale. For more information, see Avoiding Stream Staleness.
By default, triggered tasks run at most every 30 seconds. You can modify the USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS parameter to run more frequently, up to every 10 seconds.
When a task is triggered by Streams on Views, then any changes to tables referenced by the Streams on Views query will also trigger the task, regardless of any joins, aggregations, or filters in the query.
Monitoring triggered tasks¶
In the
SHOW TASKS
andDESC TASK
output, theSCHEDULE
property displaysNULL
for triggered tasks.In the output of the task_history view of the information_schema and account_usage schemas, the SCHEDULED_FROM column displays TRIGGER.
Examples¶
Example 1: Create a serverless task that runs whenever data changes in a stream.
Because the task is serverless, the TARGET_COMPLETION_INTERVAL
parameter is required to allow Snowflake to estimate the compute resources needed.
CREATE TASK my_task
TARGET_COMPLETION_INTERVAL='120 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS SELECT 1;
Example 2: Create a user-managed task that runs whenever data changes in either of two streams.
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Example 3: Create a serverless task to run whenever data changes are detected in two different data streams. Because the task uses the AND conditional, the task is skipped if only one of the two streams has new data.
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
Example 4: Create a task that runs whenever data changes in a directory table. In the example, a stream (my_directory_table_stream) is hosted on a directory table on a stage (my_test_stage).
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
To validate the triggered task, data is added to the stage.
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
The directory table is then refreshed manually, which triggers the task.
ALTER STAGE my_test_stage REFRESH