タスクグラフの例¶
このトピックでは、タスクグラフとも呼ばれる一連のタスクの実行方法について、実践的な例を示します。
このトピックの内容:
例複数のタスクの開始とステータス報告¶
次の例では、ルートタスクが3つの異なるテーブルを更新するタスクを開始します。これら3つのテーブルが更新された後、別のタスクが売上集計テーブルの更新を開始します。最後に、ファイナライザ・タスクは外部関数を使用してステータス・レポートを送信します。
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- task_a: Root task. Starts the task graph and sets basic configurations.
-- Configurations:
-- * Each task in the task graph retries twice before the task is considered
-- to have failed.
-- * If any individual task fails three times, the task graph is suspended.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
as
begin
call system$set_return_value('task_a successful');
end;
;
-- task_customer_table: Updates the customer table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
BEGIN
LET VALUE := (SELECT customer_id FROM ref_cust_table
WHERE cust_name = "Jane Doe";);
INSERT INTO customer_table VALUES('customer_id',:value);
END;
;
-- task_product_table: Updates the product table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT product_id FROM ref_item_table
WHERE PRODUCT_NAME = "widget";);
INSERT INTO product_table VALUES('product_id',:value);
END;
;
-- task_date_time_table: Updates the date/time table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO date_time_table VALUES('order_date',:value);
END;
;
-- task_sales_table: Aggregates changes from other tables.
-- Runs only after updates are complete to all three other tables.
CREATE OR REPLACE task task_sales_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_customer_table, task_product_table, task_date_time_table
AS
BEGIN
LET VALUE := (SELECT sales_order_id FROM ORDERS);
JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
INSERT INTO sales_table VALUES('sales_order_id',:value);
END;
;
-- task_a_finalizer: Sends a notification on the task’s status.
-- Runs after all other tasks either complete successfully,
-- fail, or time out.
-- Uses an external function to notify the admin through email.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
DECLARE
my_root_task_id STRING;
my_start_time TIMESTAMP_LTZ;
summary_json STRING;
summary_html STRING;
BEGIN
--- Get root task ID
my_root_task_id := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
--- Get root task scheduled time
my_start_time := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
--- Combine all task run infos into one JSON string
summary_json := (SELECT GET_TASK_GRAPH_RUN_SUMMARY(:my_root_task_id, :my_start_time));
--- Convert JSON into HTML table
summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));
--- Send HTML to email
CALL SYSTEM$SEND_EMAIL(
'email_notification',
'admin@snowflake.com',
'notification task run summary',
:summary_html,
'text/html');
--- Set return value for finalizer
CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
end;
;
CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
(SELECT
ARRAY_AGG(OBJECT_CONSTRUCT(
'task_name', name,
'run_status', state,
'return_value', return_value,
'started', query_start_time,
'duration', duration,
'error_message', error_message
)
) AS GRAPH_RUN_SUMMARY
FROM
(SELECT
NAME,
CASE
WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
WHEN STATE = 'FAILED' then '🔴 Failed'
WHEN STATE = 'SKIPPED' then '🔵 Skipped'
WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
END AS STATE,
RETURN_VALUE,
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
' s') AS DURATION,
ERROR_MESSAGE
FROM
TABLE(mweidb.information_schema.task_history(
ROOT_TASK_ID => my_root_task_id ::STRING,
SCHEDULED_TIME_RANGE_START => my_start_time,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
ORDER BY
SCHEDULED_TIME)
)::STRING
$$
;
CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<img src="https://example.com/logo.jpg"
alt="Company logo" height="72">
<p><strong>Task Graph Run Summary</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left;
width: {column_widths[i]}">{header.capitalize()}</th>'
HTML +="""
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left;
width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML +="""
</tbody>
</table>
"""
return HTML
$$
;
例:ファイナライザタスクを使用したエラー修正¶
この例は、ファイナライザータスクがエラーを修正する方法を示しています。
デモンストレーションのため、タスクは最初の実行時に失敗するように設計されています。ファイナライザ・タスクは問題を修正し、タスクを再起動します。
-- Configuration
-- By default, the notebook creates the objects in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- 1. Set the default configurations
-- Creates a root task ("task_a"), and sets the default configurations
-- used throughout the task graph.
-- Configurations include:
-- * Each task runs after one minute, with a 60-second timeout.
-- * If a task fails, retry it twice. if it fails twice,
-- the entire task graph is considered as failed.
-- * If the task graph fails consecutively 3 times, suspend the task.
-- * Other environment values are set.
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_failures = 3
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task a successful');
END;
;
-- 2. Use a runtime reflection variable
-- Creates a child task ("task_b")
-- By design, this example fails the first time it runs, because
-- it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO demo_table VALUES('task b name',:VALUE);
END;
;
-- 3. Get a task graph configuration value
-- Creates the child task ("task_c").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60
AFTER task_b
AS
BEGIN
CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
INSERT INTO demo_table VALUES('task c path',:value);
END;
;
-- 4. Get a value from a predecessor.
-- Creates the child task ("task_d").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60
AFTER task_c
as
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_a'));
INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
END;
;
-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
-- After the finalizer completes, the task should automatically retry
-- (see task_a: tasks_auto_retry_attempts).
-- On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
BEGIN
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
END;
;
-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
-- Use this command to resume the finalizer and all of its child tasks.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');
-- 7. Query the task history
SELECT
name, state, attempt_number, scheduled_from
FROM
TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;
-- 8. Suspend the task graph to stop incurring costs
-- Note: To stop the task graph, you only need to suspend the root task
-- (task_a). Child tasks don’t run unless the root task is run.
-- If any child tasks are running, they have a limited duration
-- and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;
-- 9. Check tasks during execution (optional)
-- Run this command to query the demo table during execution
-- to check which tasks have run.
SELECT * FROM demo_table;
😳
-- 10. Demo reset (optional)
-- Run this command to remove the demo table.
-- This causes task_b to fail during its first run.
-- after the task graph retries, task_b will succeed.
DROP TABLE demo_table;