タスクグラフの例

このトピックでは、タスクグラフとも呼ばれる一連のタスクの実行方法について、実践的な例を示します。

このトピックの内容:

例複数のタスクの開始とステータス報告

次の例では、ルートタスクが3つの異なるテーブルを更新するタスクを開始します。これら3つのテーブルが更新された後、別のタスクが売上集計テーブルの更新を開始します。最後に、ファイナライザ・タスクは外部関数を使用してステータス・レポートを送信します。

フローチャートでは、ルートタスクが3つの子タスクを起動し、その子タスクがまた別の子タスクを起動します。ファイナライザタスクは、他のタスクが完了した時点で開始されます。
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
-- Configurations:
-- * Each task in the task graph retries twice before the task is considered
--   to have failed.
-- * If any individual task fails three times, the task graph is suspended.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
as
  begin
    call system$set_return_value('task_a successful');
  end;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
  BEGIN
    LET VALUE := (SELECT customer_id FROM ref_cust_table
    WHERE cust_name = "Jane Doe";);
    INSERT INTO customer_table VALUES('customer_id',:value);
  END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
  BEGIN
    LET VALUE := (SELECT product_id FROM ref_item_table
    WHERE PRODUCT_NAME = "widget";);
    INSERT INTO product_table VALUES('product_id',:value);
  END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
  BEGIN
    LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
    INSERT INTO date_time_table VALUES('order_date',:value);
  END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE task task_sales_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_customer_table, task_product_table, task_date_time_table
AS
  BEGIN
    LET VALUE := (SELECT sales_order_id FROM ORDERS);
    JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
    INSERT INTO sales_table VALUES('sales_order_id',:value);
  END;
;

-- task_a_finalizer: Sends a notification on the task’s status.
--   Runs after all other tasks either complete successfully,
--   fail, or time out.
--   Uses an external function to notify the admin through email.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
  DECLARE
      my_root_task_id STRING;
      my_start_time TIMESTAMP_LTZ;
      summary_json STRING;
      summary_html STRING;
  BEGIN
      --- Get root task ID
      my_root_task_id := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));

      --- Get root task scheduled time
      my_start_time := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));

      --- Combine all task run infos into one JSON string
      summary_json := (SELECT GET_TASK_GRAPH_RUN_SUMMARY(:my_root_task_id, :my_start_time));

      --- Convert JSON into HTML table
      summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));


      --- Send HTML to email
      CALL SYSTEM$SEND_EMAIL(
          'email_notification',
          'admin@snowflake.com',
          'notification task run summary',
          :summary_html,
          'text/html');

      --- Set return value for finalizer
      CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
end;
;

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
    (SELECT
        ARRAY_AGG(OBJECT_CONSTRUCT(
            'task_name', name,
            'run_status', state,
            'return_value', return_value,
            'started', query_start_time,
            'duration', duration,
            'error_message', error_message
            )
        ) AS GRAPH_RUN_SUMMARY
    FROM
        (SELECT
            NAME,
            CASE
                WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
                WHEN STATE = 'FAILED' then '🔴 Failed'
                WHEN STATE = 'SKIPPED' then '🔵 Skipped'
                WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
            END AS STATE,
            RETURN_VALUE,
            TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
            CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
              ' s') AS DURATION,
            ERROR_MESSAGE
        FROM
            TABLE(mweidb.information_schema.task_history(
                ROOT_TASK_ID => my_root_task_id ::STRING,
                SCHEDULED_TIME_RANGE_START => my_start_time,
                SCHEDULED_TIME_RANGE_END => current_timestamp()
                ))
        ORDER BY
            SCHEDULED_TIME)
    )::STRING
$$
;


CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON

def GENERATE_HTML_TABLE(JSON_DATA):

    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

    DATA = json.loads(JSON_DATA)

    HTML = f"""
        <img src="https://example.com/logo.jpg"
        alt="Company logo" height="72">
        <p><strong>Task Graph Run Summary</strong>
        <br>Sign in to Snowsight to see more details.</p>
        <table border="1" style="border-color:#DEE3EA"
        cellpadding="5" cellspacing="0">
          <thead>
            <tr>
    """
    headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
    for i, header in enumerate(headers):
        HTML += f'<th scope="col" style="text-align:left;
        width: {column_widths[i]}">{header.capitalize()}</th>'

    HTML +="""
            </tr>
    </thead>
    <tbody>
    """

    for ROW_DATA in DATA:
        HTML += "<tr>"
        for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
        HTML += "</tr>"

    HTML +="""
        </tbody>
    </table>
    """

    return HTML
$$
;
Copy

例:ファイナライザタスクを使用したエラー修正

この例は、ファイナライザータスクがエラーを修正する方法を示しています。

デモンストレーションのため、タスクは最初の実行時に失敗するように設計されています。ファイナライザ・タスクは問題を修正し、タスクを再起動します。

タスクシリーズを示す図。左上がタスクA。矢印はタスクAからタスクBを指し、タスクBはタスクCを指し、タスクCはタスクDを指します。タスクAの下にある矢印は、ファイナライザタスクであるタスクFを指しています。
-- Configuration
-- By default, the notebook creates the objects in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively 3 times, suspend the task.
--    * Other environment values are set.

SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_failures = 3
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
  BEGIN
    CALL SYSTEM$SET_RETURN_VALUE('task a successful');
  END;
;

-- 2. Use a runtime reflection variable
--    Creates a child task ("task_b")
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
  BEGIN
    LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
    INSERT INTO demo_table VALUES('task b name',:VALUE);

  END;
;

-- 3. Get a task graph configuration value
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60
AFTER task_b
AS
  BEGIN
    CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
    LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
    INSERT INTO demo_table VALUES('task c path',:value);
  END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60
AFTER task_c
as
  BEGIN
    LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_a'));
    INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
  END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: tasks_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
  BEGIN
    CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
  END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer and all of its child tasks.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;
😳
-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     after the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy