Triggered tasks¶

Use triggered tasks to run tasks whenever there’s a change in a stream. This eliminates the need to poll a source frequently when the availability of new data is unpredictable. It also reduces latency because data is processed immediately.

Triggered tasks don’t use compute resources until the event is triggered.

Considerations¶

  • For streams hosted on directory tables, the directory table needs to be refreshed before a triggered task can detect the changes. To detect changes, you can do either of the following:

  • Streams on external tables and hybrid tables are not supported.

Create a triggered task¶

Use CREATE TASK, and set the following parameters:

  • Define the target stream using the WHEN clause. (Do not include the SCHEDULE parameter.)

    When working with multiple data streams, you can use conditional parameters: WHEN ... AND and WHEN ... OR.

  • Additional requirements based on compute resources:

    • To create a serverless task, you must include the TARGET_COMPLETION_INTERVAL parameter. Do not include the WAREHOUSE parameter. Snowflake estimates the resources needed using the target completion interval, and adjusts to complete the task in this time.

    Diagram showing how serverless triggered tasks work in Snowflake.
    • To create a task that runs on a user-managed warehouse, include the WAREHOUSE parameter and define the warehouse.

Migrate an existing task from a scheduled task to a triggered task¶

  1. Suspend the task.

  2. Unset the SCHEDULE parameter, and add the WHEN clause to define the target stream.

  3. Resume the task.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrate an existing user-managed triggered task to a serverless triggered task¶

  1. Suspend the task.

  2. Remove the WAREHOUSE parameter, and set the TARGET_COMPLETION_INTERVAL parameter.

  3. Resume the task.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

For more information, see serverless tasks.

Allowing the triggered task to run¶

When you create a triggered task, it starts in the suspended state.

To begin monitoring the stream:

The task runs in the following conditions:

  • When you first resume a triggered task, the task checks the stream for changes since the last task was run. If there’s been a change, the task runs; otherwise, it skips the task without using compute resources.

  • If a task is running and the stream has new data, the task waits until the current task is complete. Snowflake ensures only one instance of a task is executed at a time.

  • After a task is complete, Snowflake checks for changes in the stream again. If there are changes, the task runs again; if not, it skips the task.

  • The task runs whenever new data is detected in the stream.

  • If the stream data is hosted on a directory table, then to detect changes, you can do either of the following:

  • Every 12 hours, the task runs a health check to prevent streams from becoming stale. If there are no changes, Snowflake skips the task without using compute resources. For streams, the task instructions must consume the data in the stream before the data retention expires; if they don’t, the stream becomes stale. For more information, see Avoiding Stream Staleness.

  • By default, triggered tasks run at most every 30 seconds. You can modify the USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS parameter to run more frequently, up to every 10 seconds.

  • When a task is triggered by Streams on Views, then any changes to tables referenced by the Streams on Views query will also trigger the task, regardless of any joins, aggregations, or filters in the query.

A diagram shows how triggered tasks manage new data as it comes in, and also check for changes every 12 hours.

Monitoring triggered tasks¶

  • In the SHOW TASKS and DESC TASK output, the SCHEDULE property displays NULL for triggered tasks.

  • In the output of the task_history view of the information_schema and account_usage schemas, the SCHEDULED_FROM column displays TRIGGER.

Examples¶

Example 1: Create a serverless task that runs whenever data changes in a stream.

Because the task is serverless, the TARGET_COMPLETION_INTERVAL parameter is required to allow Snowflake to estimate the compute resources needed.

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

Example 2: Create a user-managed task that runs whenever data changes in either of two streams.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Example 3: Create a serverless task to run whenever data changes are detected in two different data streams. Because the task uses the AND conditional, the task is skipped if only one of the two streams has new data.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Example 4: Create a task that runs whenever data changes in a directory table. In the example, a stream (my_directory_table_stream) is hosted on a directory table on a stage (my_test_stage).

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

To validate the triggered task, data is added to the stage.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

The directory table is then refreshed manually, which triggers the task.

ALTER STAGE my_test_stage REFRESH
Copy