Task graph examples¶
This topic provides practical examples of how to run a sequence of tasks, also known as a task graph.
Example: Start multiple tasks and report status¶
In the following example, the root task starts tasks to update three different tables. After those three tables are updated, another task starts to update an aggregate sales table. Last, a finalizer task sends a status report using an external function.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- task_a: Root task. Starts the task graph and sets basic configurations.
-- Configurations:
-- * Each task in the task graph retries twice before the task is considered
-- to have failed.
-- * If any individual task fails three times, the task graph is suspended.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
as
begin
call system$set_return_value('task_a successful');
end;
;
-- task_customer_table: Updates the customer table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
BEGIN
LET VALUE := (SELECT customer_id FROM ref_cust_table
WHERE cust_name = "Jane Doe";);
INSERT INTO customer_table VALUES('customer_id',:value);
END;
;
-- task_product_table: Updates the product table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT product_id FROM ref_item_table
WHERE PRODUCT_NAME = "widget";);
INSERT INTO product_table VALUES('product_id',:value);
END;
;
-- task_date_time_table: Updates the date/time table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO date_time_table VALUES('order_date',:value);
END;
;
-- task_sales_table: Aggregates changes from other tables.
-- Runs only after updates are complete to all three other tables.
CREATE OR REPLACE task task_sales_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_customer_table, task_product_table, task_date_time_table
AS
BEGIN
LET VALUE := (SELECT sales_order_id FROM ORDERS);
JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
INSERT INTO sales_table VALUES('sales_order_id',:value);
END;
;
-- task_a_finalizer: Sends a notification on the task’s status.
-- Runs after all other tasks either complete successfully,
-- fail, or time out.
-- Uses an external function to notify the admin through email.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
DECLARE
my_root_task_id STRING;
my_start_time TIMESTAMP_LTZ;
summary_json STRING;
summary_html STRING;
BEGIN
--- Get root task ID
my_root_task_id := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
--- Get root task scheduled time
my_start_time := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
--- Combine all task run infos into one JSON string
summary_json := (SELECT GET_TASK_GRAPH_RUN_SUMMARY(:my_root_task_id, :my_start_time));
--- Convert JSON into HTML table
summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));
--- Send HTML to email
CALL SYSTEM$SEND_EMAIL(
'email_notification',
'admin@snowflake.com',
'notification task run summary',
:summary_html,
'text/html');
--- Set return value for finalizer
CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
end;
;
CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
(SELECT
ARRAY_AGG(OBJECT_CONSTRUCT(
'task_name', name,
'run_status', state,
'return_value', return_value,
'started', query_start_time,
'duration', duration,
'error_message', error_message
)
) AS GRAPH_RUN_SUMMARY
FROM
(SELECT
NAME,
CASE
WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
WHEN STATE = 'FAILED' then '🔴 Failed'
WHEN STATE = 'SKIPPED' then '🔵 Skipped'
WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
END AS STATE,
RETURN_VALUE,
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
' s') AS DURATION,
ERROR_MESSAGE
FROM
TABLE(mweidb.information_schema.task_history(
ROOT_TASK_ID => my_root_task_id ::STRING,
SCHEDULED_TIME_RANGE_START => my_start_time,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
ORDER BY
SCHEDULED_TIME)
)::STRING
$$
;
CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<img src="https://example.com/logo.jpg"
alt="Company logo" height="72">
<p><strong>Task Graph Run Summary</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left;
width: {column_widths[i]}">{header.capitalize()}</th>'
HTML +="""
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left;
width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML +="""
</tbody>
</table>
"""
return HTML
$$
;
Example: Use a finalizer task to correct for errors¶
This example demonstrates how a finalizer task can correct for errors.
For demonstration purposes, the tasks are designed to fail during their first run. The finalizer tasks corrects the issue and restarts the tasks, which succeed on following runs:
-- Configuration
-- By default, the notebook creates the objects in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- 1. Set the default configurations
-- Creates a root task ("task_a"), and sets the default configurations
-- used throughout the task graph.
-- Configurations include:
-- * Each task runs after one minute, with a 60-second timeout.
-- * If a task fails, retry it twice. if it fails twice,
-- the entire task graph is considered as failed.
-- * If the task graph fails consecutively 3 times, suspend the task.
-- * Other environment values are set.
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_failures = 3
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task a successful');
END;
;
-- 2. Use a runtime reflection variable
-- Creates a child task ("task_b")
-- By design, this example fails the first time it runs, because
-- it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO demo_table VALUES('task b name',:VALUE);
END;
;
-- 3. Get a task graph configuration value
-- Creates the child task ("task_c").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60
AFTER task_b
AS
BEGIN
CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
INSERT INTO demo_table VALUES('task c path',:value);
END;
;
-- 4. Get a value from a predecessor.
-- Creates the child task ("task_d").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60
AFTER task_c
as
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_a'));
INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
END;
;
-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
-- After the finalizer completes, the task should automatically retry
-- (see task_a: tasks_auto_retry_attempts).
-- On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
BEGIN
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
END;
;
-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
-- Use this command to resume the finalizer and all of its child tasks.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');
-- 7. Query the task history
SELECT
name, state, attempt_number, scheduled_from
FROM
TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;
-- 8. Suspend the task graph to stop incurring costs
-- Note: To stop the task graph, you only need to suspend the root task
-- (task_a). Child tasks don’t run unless the root task is run.
-- If any child tasks are running, they have a limited duration
-- and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;
-- 9. Check tasks during execution (optional)
-- Run this command to query the demo table during execution
-- to check which tasks have run.
SELECT * FROM demo_table;
😳
-- 10. Demo reset (optional)
-- Run this command to remove the demo table.
-- This causes task_b to fail during its first run.
-- after the task graph retries, task_b will succeed.
DROP TABLE demo_table;