작업 그래프 예제

이 항목에서는 작업 그래프라고도 하는 작업 시퀀스를 실행하는 방법에 대한 실용적인 예를 제공합니다.

이 항목의 내용:

예제: 여러 작업 시작 및 상태 보고하기

다음 예제에서는 루트 작업이 세 개의 서로 다른 테이블을 업데이트하는 작업을 시작합니다. 이 세 개의 테이블이 업데이트되면 다른 작업이 집계 판매 테이블을 업데이트하기 시작합니다. 마지막으로 파이널라이저 작업은 외부 함수를 사용하여 상태 보고서를 보냅니다.

플로차트는 다른 하위 작업을 시작하는 세 개의 하위 작업을 시작하는 루트 작업을 보여줍니다. 다른 작업이 완료되면 파이널라이저 작업이 시작됩니다.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
-- Configurations:
-- * Each task in the task graph retries twice before the task is considered
--   to have failed.
-- * If any individual task fails three times, the task graph is suspended.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
as
  begin
    call system$set_return_value('task_a successful');
  end;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
  BEGIN
    LET VALUE := (SELECT customer_id FROM ref_cust_table
    WHERE cust_name = "Jane Doe";);
    INSERT INTO customer_table VALUES('customer_id',:value);
  END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
  BEGIN
    LET VALUE := (SELECT product_id FROM ref_item_table
    WHERE PRODUCT_NAME = "widget";);
    INSERT INTO product_table VALUES('product_id',:value);
  END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_a
AS
  BEGIN
    LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
    INSERT INTO date_time_table VALUES('order_date',:value);
  END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE task task_sales_table
USER_TASK_TIMEOUT_MS = 60
AFTER task_customer_table, task_product_table, task_date_time_table
AS
  BEGIN
    LET VALUE := (SELECT sales_order_id FROM ORDERS);
    JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
    INSERT INTO sales_table VALUES('sales_order_id',:value);
  END;
;

-- task_a_finalizer: Sends a notification on the task’s status.
--   Runs after all other tasks either complete successfully,
--   fail, or time out.
--   Uses an external function to notify the admin through email.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
  DECLARE
      my_root_task_id STRING;
      my_start_time TIMESTAMP_LTZ;
      summary_json STRING;
      summary_html STRING;
  BEGIN
      --- Get root task ID
      my_root_task_id := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));

      --- Get root task scheduled time
      my_start_time := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));

      --- Combine all task run infos into one JSON string
      summary_json := (SELECT GET_TASK_GRAPH_RUN_SUMMARY(:my_root_task_id, :my_start_time));

      --- Convert JSON into HTML table
      summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));


      --- Send HTML to email
      CALL SYSTEM$SEND_EMAIL(
          'email_notification',
          'admin@snowflake.com',
          'notification task run summary',
          :summary_html,
          'text/html');

      --- Set return value for finalizer
      CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
end;
;

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
    (SELECT
        ARRAY_AGG(OBJECT_CONSTRUCT(
            'task_name', name,
            'run_status', state,
            'return_value', return_value,
            'started', query_start_time,
            'duration', duration,
            'error_message', error_message
            )
        ) AS GRAPH_RUN_SUMMARY
    FROM
        (SELECT
            NAME,
            CASE
                WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
                WHEN STATE = 'FAILED' then '🔴 Failed'
                WHEN STATE = 'SKIPPED' then '🔵 Skipped'
                WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
            END AS STATE,
            RETURN_VALUE,
            TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
            CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
              ' s') AS DURATION,
            ERROR_MESSAGE
        FROM
            TABLE(mweidb.information_schema.task_history(
                ROOT_TASK_ID => my_root_task_id ::STRING,
                SCHEDULED_TIME_RANGE_START => my_start_time,
                SCHEDULED_TIME_RANGE_END => current_timestamp()
                ))
        ORDER BY
            SCHEDULED_TIME)
    )::STRING
$$
;


CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON

def GENERATE_HTML_TABLE(JSON_DATA):

    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

    DATA = json.loads(JSON_DATA)

    HTML = f"""
        <img src="https://example.com/logo.jpg"
        alt="Company logo" height="72">
        <p><strong>Task Graph Run Summary</strong>
        <br>Sign in to Snowsight to see more details.</p>
        <table border="1" style="border-color:#DEE3EA"
        cellpadding="5" cellspacing="0">
          <thead>
            <tr>
    """
    headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
    for i, header in enumerate(headers):
        HTML += f'<th scope="col" style="text-align:left;
        width: {column_widths[i]}">{header.capitalize()}</th>'

    HTML +="""
            </tr>
    </thead>
    <tbody>
    """

    for ROW_DATA in DATA:
        HTML += "<tr>"
        for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
        HTML += "</tr>"

    HTML +="""
        </tbody>
    </table>
    """

    return HTML
$$
;
Copy

예제: 파이널라이저 작업을 사용하여 오류 수정하기

이 예제는 파이널라이저 작업으로 오류를 수정하는 방법을 보여줍니다.

데모 목적으로 작업은 처음 실행하는 동안 실패하도록 설계되었습니다. 파이널라이저 작업은 문제를 수정하고 작업을 다시 시작하여 다음 실행에서 성공합니다.

작업 시리즈를 보여주는 다이어그램. 작업 A는 왼쪽 상단에 표시됩니다. 화살표는 작업 A에서 바로 작업 B를 가리키고, 이 화살표는 작업 C를 가리키며, 이 화살표는 작업 D를 가리킵니다. 작업 A 아래에서 화살표는 마무리 작업인 작업 F를 가리킵니다.
-- Configuration
-- By default, the notebook creates the objects in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively 3 times, suspend the task.
--    * Other environment values are set.

SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_failures = 3
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
  BEGIN
    CALL SYSTEM$SET_RETURN_VALUE('task a successful');
  END;
;

-- 2. Use a runtime reflection variable
--    Creates a child task ("task_b")
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60
AFTER TASK_A
AS
  BEGIN
    LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
    INSERT INTO demo_table VALUES('task b name',:VALUE);

  END;
;

-- 3. Get a task graph configuration value
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60
AFTER task_b
AS
  BEGIN
    CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
    LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
    INSERT INTO demo_table VALUES('task c path',:value);
  END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60
AFTER task_c
as
  BEGIN
    LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_a'));
    INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
  END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: tasks_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60
FINALIZE = task_a
AS
  BEGIN
    CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
  END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer and all of its child tasks.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;
😳
-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     after the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy