작업 그래프로 작업 시퀀스 생성하기

Snowflake에서는 방향성 비순환 그래프라고도 하는 작업 그래프(DAG)를 사용하여 여러 작업을 관리할 수 있습니다. 작업 그래프는 루트 작업과 종속된 하위 작업으로 구성됩니다. 종속성은 루프 없이 시작에서 끝나는 방향으로 실행되어야 합니다. 선택적 최종 작업(파이널라이저)은 다른 모든 작업이 완료된 후 정리 작업을 수행할 수 있습니다.

런타임 값, 그래프 수준 구성, 상위 작업의 반환 값을 사용하여 작업 본문에서 논리 기반 작업을 지정해 동적 동작을 갖는 작업 그래프를 구축하십시오.

지원 언어 및 도구, SQL, JavaScript, Python, Java, Scala 또는 Snowflake Scripting을 사용하여 작업 및 작업 그래프를 생성할 수 있습니다. 이 항목에서는 SQL 예제를 제공합니다. Python 예제는 Python을 사용하여 Snowflake 작업 및 작업 그래프 관리하기 섹션을 참조하십시오.

작업 그래프 만들기

CREATE TASK 를 사용하여 루트 작업을 생성한 다음 CREATE TASK 를 사용하여 하위 작업을 생성합니다. AFTER 를 클릭하여 상위 항목을 선택합니다.

루트 작업은 작업 그래프가 를 실행할 때 를 정의합니다. 하위 작업은 작업 그래프에 정의된 순서대로 실행됩니다.

여러 하위 작업에 동일한 상위 항목이 있는 경우 하위 작업은 병렬로 실행됩니다.

작업에 상위 항목이 여러 개 있는 경우 작업은 모든 선행 작업이 성공적으로 완료될 때까지 기다렸다가 시작합니다. (이 작업은 일부 상위 항목을 건너뛰어도 실행될 수 있습니다. 자세한 내용은 하위 작업 건너뛰기 또는 일시 중단하기 섹션을 참조하십시오).

다음 예제는 1분마다 실행되도록 예약된 루트 작업으로 시작하는 서버리스 작업 그래프를 생성합니다. 루트 작업에는 병렬로 실행되는 2개의 하위 작업이 있습니다. (다이어그램은 이러한 작업 중 하나가 다른 작업보다 오래 실행되는 예를 보여줍니다.) 두 작업이 모두 완료되면 세 번째 하위 작업이 실행됩니다. 파이널라이저 작업은 다른 모든 작업이 완료되거나 완료되지 않은 후에 실행됩니다.

작업 시퀀스의 다이어그램입니다.
CREATE TASK task_root
  SCHEDULE = '1 MINUTE'
  AS SELECT 1;

CREATE TASK task_a
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_b
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_c
  AFTER task_a, task_b
  AS SELECT 1;
Copy

고려 사항:

  • 작업 그래프는 최대 1,000개의 작업으로 제한됩니다.

  • 단일 작업에는 최대 100개의 상위 작업과 100개의 하위 작업이 있을 수 있습니다.

  • 동일한 사용자 관리 웨어하우스에서 작업이 병렬로 실행되는 경우, 동시 작업 실행을 처리할 수 있도록 컴퓨팅 리소스 의 크기를 조정해야 합니다.

종료자 작업

작업 그래프의 다른 모든 작업이 완료(또는 완료 실패)된 후에 실행되도록 선택적 마무리 작업을 추가할 수 있습니다. 이를 사용하여 다음을 수행합니다.

  • 더 이상 필요하지 않은 중간 데이터를 정리하는 등의 정리 작업을 수행합니다.

  • 작업 성공 또는 실패에 대한 알림을 보냅니다.

작업 시퀀스는 2개의 하위 작업을 가리키는 루트 작업을 보여주며, 이 작업은 차례로 다른 작업을 가리킵니다. 마무리 작업은 다른 모든 작업이 완료되거나 완료되지 않은 후에 실행되는 하단에 표시됩니다.

파이널라이저 작업을 생성하려면 루트 작업에서 CREATE TASK … FINALIZE … 를 사용합니다. 예:

CREATE TASK task_finalizer
  FINALIZE = task_root
  AS SELECT 1;
Copy

고려 사항:

  • 종료자 작업은 항상 루트 작업과 연결됩니다. 각 루트 작업에는 1개의 파이널라이저 작업만 있을 수 있으며, 파이널라이저 작업은 1개의 루트 작업에만 연결할 수 있습니다.

  • 작업 그래프의 루트 작업을 건너뛰면(예: 중첩되는 작업 그래프 실행으로 인해) 파이널라이저 작업이 시작되지 않습니다.

  • 종료자 작업에는 하위 작업이 있을 수 없습니다.

  • 파이널라이저 작업은 현재 작업 그래프 실행에서 실행 중이거나 큐에 대기 중인 다른 작업이 없을 때만 예약됩니다.

자세한 예는 파이널라이저 작업 예시: 이메일 알림 보내기파이널라이저 작업 예제: 오류 수정 섹션을 참조하십시오.

작업 그래프 소유권 관리

작업 그래프의 모든 작업은 동일한 작업 소유자를 가져야 하며 동일한 데이터베이스 및 스키마에 저장되어야 합니다.

다음 작업 중 하나를 사용하여 작업 그래프의 모든 작업 소유권을 이전할 수 있습니다.

  • DROP ROLE 을 사용하여 작업 그래프에서 모든 작업의 소유자를 삭제합니다. Snowflake는 DROP ROLE 명령을 실행하는 역할로 소유권을 이전합니다.

  • 스키마의 모든 작업에서 GRANT OWNERSHIP 을 사용하여 작업 그래프에 있는 모든 작업의 소유권을 이전합니다.

이러한 방법을 사용하여 작업 그래프에서 작업 소유권을 이전하면 작업 그래프에 있는 작업 간의 관계가 유지됩니다.

단일 작업의 소유권을 이전하면 작업과 상위 작업, 하위 작업 간의 종속성이 제거됩니다. 자세한 내용은 이 항목의 상위 작업과 하위 작업 연결 해제 섹션을 참조하십시오.

참고

그래프가 복제를 수행하는 역할과 다른 역할에 의해 소유된 경우에는 작업 그래프에 대한 데이터베이스 복제가 작동하지 않습니다.

작업 그래프에서 작업 실행 또는 예약하기

작업 그래프를 수동으로 실행하기

작업 그래프의 단일 인스턴스를 실행할 수 있습니다. 이는 프로덕션에서 작업 그래프를 활성화하기 전에 새 작업 그래프 또는 수정된 작업 그래프를 테스트하거나 필요에 따라 일회성으로 실행할 때 유용합니다.

작업 그래프를 시작하기 전에 실행에 포함할 각 하위 작업(선택 사항인 파이널라이저 작업 포함)에서 ALTER TASK … RESUME 을 사용합니다.

작업 그래프의 단일 인스턴스를 실행하려면 루트 작업에서 EXECUTE TASK 를 사용하십시오. 루트 작업을 실행하면 작업 그래프에서 재개된 모든 하위 작업이 작업 그래프에 정의된 순서대로 실행됩니다.

예약 또는 트리거된 작업으로 작업 실행하기

루트 작업에서 작업 그래프가 실행되는 시기를 정의합니다. 작업 그래프는 반복 예약에 따라 실행되거나 이벤트에 의해 트리거될 수 있습니다. 자세한 내용은 다음 항목을 참조하십시오.

작업 그래프를 시작하려면 다음 중 하나를 수행하면 됩니다.

  • 실행에 포함할 각 하위 작업(파이널라이저 포함)을 다시 시작한 다음 ALTER TASK … RESUME 을 사용하여 루트 작업을 다시 시작합니다.

  • 루트 작업에서 SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>)을 사용하여 작업 그래프의 모든 작업을 한 번에 다시 시작하십시오.

작업 그래프에서 종속 작업 보기

루트 작업에 대한 하위 작업을 보려면 TASK_DEPENDENTS 테이블 함수를 호출하십시오. 작업 그래프에서 모든 작업을 검색하려면 함수를 호출할 때 루트 작업을 입력합니다.

Snowsight 를 사용하여 작업 그래프를 관리하고 볼 수도 있습니다. 자세한 내용은 Snowsight 의 작업 및 작업 그래프 보기 섹션을 참조하십시오.

작업 수정, 일시 중단 또는 다시 시도하기

작업 그래프에서 작업 수정하기

예약된 작업 그래프에서 작업을 수정하려면 ALTER TASK … SUSPEND 을 사용하여 루트 작업을 일시 중단합니다. 작업 그래프 실행이 진행 중이면 현재 실행이 완료됩니다. 루트 작업의 향후 예약된 모든 실행이 취소됩니다.

루트 작업이 일시 중단되면 파이널라이저 작업을 포함한 하위 작업은 해당 상태(일시 중단됨, 실행 중 또는 완료됨)를 유지합니다. 하위 작업은 개별적으로 일시 중단할 필요가 없습니다.

루트 작업을 일시 중단한 후에는 작업 그래프에서 모든 작업을 수정할 수 있습니다.

작업 그래프를 다시 시작하려면 다음 중 하나를 수행하면 됩니다.

  • ALTER TASK … RESUME 을 사용하여 루트 작업을 다시 시작하십시오. 이전에 실행 중이던 개별 하위 작업은 다시 시작할 필요가 없습니다.

  • SYSTEM$TASK_DEPENDENTS_ENABLE 을 호출하고 루트 작업의 이름을 전달하여 작업 그래프의 모든 작업을 한 번에 다시 시작하십시오.

하위 작업 건너뛰기 또는 일시 중단하기

작업 그래프에서 하위 작업을 건너뛰려면 ALTER TASK … SUSPEND 을 사용하여 하위 작업을 일시 중단합니다.

하위 작업을 일시 중단하면 하위 작업이 성공한 것처럼 작업 그래프가 계속 실행됩니다. 여러 선행 작업이 있는 하위 작업은 선행 작업 중 하나 이상 이 재개된 상태에 있고 재개된 선행 작업이 전부 성공적으로 실행 완료되는 한 실행됩니다.

다이어그램은 일시 중단된 하위 작업을 포함하는 작업 그래프를 보여줍니다. 일시 중단된 하위 작업은 건너뛰고 작업 그래프가 완료됩니다.

실패한 작업 다시 시도하기

EXECUTE TASK … RETRY LAST 을 사용하여 마지막으로 실패한 작업의 작업 그래프를 실행해 보십시오. 작업이 성공하면 모든 하위 작업은 이전 작업이 완료됨에 따라 계속 실행됩니다.

자동 재시도

기본적으로 하위 작업이 실패하면 전체 작업 그래프가 실패한 것으로 간주됩니다.

다음 예약된 작업 그래프 실행까지 기다리지 않고 루트 작업에서 TASK_AUTO_RETRY_ATTEMPTS 매개 변수를 설정하여 작업 그래프가 즉시 다시 시도하도록 지침을 내릴 수 있습니다. 하위 작업이 실패하면 전체 작업 그래프가 지정된 횟수까지 즉시 재시도됩니다. 작업 그래프가 여전히 완료되지 않으면 작업 그래프가 실패한 것으로 간주됩니다.

작업 그래프 실행 실패 후 작업 그래프 일시 중단

기본적으로 작업 그래프는 10회 연속 실패하면 일시 중단됩니다. 루트 작업에서 SUSPEND_TASK_AFTER_NUM_FAILURES 를 설정하여 이 값을 변경할 수 있습니다.

다음 예제에서는 하위 작업이 실패할 때마다 작업 그래프가 즉시 두 번 재시도한 후 전체 작업 그래프가 실패한 것으로 간주됩니다. 작업 그래프가 연속으로 3회 실패하면 작업 그래프가 일시 중단됩니다.

CREATE OR REPLACE TASK task_root
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2   --  Failed task graph retries up to 2 times
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3   --  Task graph suspends after 3 consecutive failures
  AS SELECT 1;
Copy

겹치는 작업 그래프 실행

기본적으로 Snowflake는 특정 작업 그래프의 인스턴스가 한 번에 1개만 실행하는 것을 허용합니다. 작업 그래프에 있는 모든 작업의 실행이 완료된 후에만 루트 작업의 다음 실행이 예약됩니다. 이는 작업 그래프의 모든 작업을 실행하는 데 필요한 누적 시간이 루트 작업의 정의에 설정된 명시적인 예약 시간을 초과할 경우 작업 그래프의 실행을 한 번 이상 건너뛴다는 의미입니다.

하위 작업이 겹치도록 허용하려면 루트 작업에서 CREATE TASK 또는 ALTER TASK 를 사용하고 ALLOW_OVERLAPPING_EXECUTION 을 TRUE 로 설정합니다. (루트 작업은 절대 중첩되지 않습니다.)

겹치는 작업 그래프 실행

작업 그래프의 중복 실행으로 인해 실행되는 SQL 읽기/쓰기 작업이 잘못된 데이터 또는 복제된 데이터를 생성하지 않으면 중복 실행이 허용되거나 권장될 수도 있습니다. 하지만 다른 작업 그래프의 경우, 작업 소유자(작업 그래프의 모든 작업에 대한 OWNERSHIP 권한을 가진 역할)가 루트 작업 실행 예약을 올바르게 설정하고 적절한 웨어하우스 크기를 선택(또는 서버리스 컴퓨팅 리소스를 사용)하여 루트 작업의 다음 실행이 예약되기 전에 작업 그래프의 인스턴스가 완료되도록 해야 합니다.

루트 작업에 정의된 일정과 작업 그래프를 더 잘 정렬하는 방법은 다음과 같습니다.

  1. 가능한 경우, 루트 작업 실행 간격의 예약 시간을 늘립니다.

  2. 서버리스 컴퓨팅 리소스를 사용하도록 컴퓨팅 집약적 작업을 수정해 보십시오. 작업이 사용자 관리 컴퓨팅 리소스에 의존하는 경우 작업 그래프에 있는 복잡하거나 대규모 SQL 문 또는 저장 프로시저를 실행하는 웨어하우스 크기를 늘리십시오.

  3. 각 작업이 실행하는 SQL 문 또는 저장 프로시저를 분석합니다. 병렬 처리를 활용하기 위해 코드를 다시 작성할 수 있는지 결정합니다.

위에 설명한 방법으로도 해결되지 않으면, 루트 작업에서 ALLOW_OVERLAPPING_EXECUTION = TRUE로 설정하여 작업 그래프의 동시 실행을 허용할 수도 있습니다. 이 매개 변수는 작업(CREATE TASK 사용)을 만들거나 이후(ALTER TASK 사용 또는 Snowsight 에서)에 정의할 수 있습니다.

버전 관리

작업 그래프의 루트 작업이 재개되거나 수동으로 실행되면 Snowflake는 작업 그래프에 있는 모든 작업의 모든 속성을 포함하여 전체 작업 그래프의 버전을 설정합니다. 작업이 일시 중단되고 수정된 후, Snowflake는 루트 작업이 재개되거나 수동으로 실행될 때 새 버전을 설정합니다.

작업 그래프에서 작업을 수정하거나 다시 생성하려면 우선 루트 작업을 일시 중단해야 합니다. 루트 작업이 일시 중단되면 루트 작업의 향후 예약된 모든 실행이 취소되지만, 현재 실행 중인 작업이 있는 경우 해당 작업과 모든 하위 작업은 현재 버전을 사용하여 계속 실행됩니다.

참고

작업 그래프가 실행되고 있는 동안 작업이 호출한 저장 프로시저의 정의가 변경되는 경우, 현재 실행 상태에 있는 작업이 저장 프로시저를 호출할 때 새 프로그래밍을 실행할 수 있습니다.

예를 들어 작업 그래프의 루트 작업이 일시 중단되었지만 이 작업의 예약된 실행이 이미 시작되었다고 가정하겠습니다. 루트 작업이 여전히 실행되는 동안 작업 그래프에 있는 모든 작업의 소유자는 하위 작업이 호출한 SQL 코드를 수정합니다. 하위 작업은 루트 작업이 실행을 시작할 때 최신이었던 버전의 작업 그래프를 사용하여 정의에 있는 SQL 코드를 실행합니다. 루트 작업이 다시 시작되거나 수동으로 실행된 후의 새 작업 그래프 버전이 설정됩니다. 이 새 버전에는 하위 작업에 대한 수정 사항이 포함됩니다.

작업 버전 기록을 검색하려면 SNOWFLAKE 공유 데이터베이스에서 TASK_VERSIONS Account Usage 뷰 를 쿼리하십시오.

작업 그래프 지속 시간

작업 그래프 기간에는 루트 작업이 시작되도록 예약된 시점부터 마지막 하위 작업이 완료될 때까지의 시간이 포함됩니다. 작업 그래프의 지속 시간을 계산하려면 COMPLETE_TASK_GRAPHS 뷰 를 쿼리하고 SCHEDULED_TIME 및 COMPLETED_TIME 을 비교합니다.

예를 들어, 다음 다이어그램은 1분마다 실행되도록 예약된 작업 그래프를 보여줍니다. 루트 작업과 두 하위 작업은 각각 5초 동안 큐에 대기하고 10초 동안 실행되므로 완료하는 데 총 45초가 소요됩니다.

종속성이 있는 세 개의 작업을 포함한 작업 그래프 다이어그램입니다. 각 작업은 5초 동안 큐에 대기하고 10초 동안 실행되어 총 45초 동안 실행됩니다.

작업 그래프 시간 제한

루트 작업에 USER_TASK_TIMEOUT_MS 를 설정하면 시간 제한이 전체 작업 그래프에 적용됩니다.

하위 작업 또는 파이널라이저 작업에 USER_TASK_TIMEOUT_MS 가 설정되어 있으면 시간 제한은 해당 작업에만 적용됩니다.

루트 작업과 하위 작업 모두에 USER_TASK_TIMEOUT_MS 이 설정되어 있으면 하위 작업 시간 제한이 해당 하위 작업에 대한 루트 작업 시간 제한을 재정의합니다.

고려 사항

  • 서버리스 작업의 경우, Snowflake는 큐 대기 시간을 포함하여 대상 완료 간격 내에 작업이 완료되도록 리소스를 자동으로 확장합니다.

  • 사용자 관리 작업의 경우 공유 또는 사용량이 많은 웨어하우스에서 작업이 예약된 경우 큐 대기 기간이 길어지는 것이 일반적입니다.

  • 작업 그래프의 경우 총 시간에는 상위 작업이 완료되기를 기다리는 하위 작업에 대한 추가 큐 대기 시간이 포함될 수 있습니다.

논리(런타임 정보, 구성 및 반환 값)이 포함된 작업 그래프 생성하기

작업 그래프의 작업은 상위 작업의 반환 값을 사용하여 함수 본문에서 논리 기반 작업을 수행할 수 있습니다.

고려 사항:

  • SYSTEM$GET_PREDECESSOR_RETURN_VALUE 같은 일부 논리 기반 명령은 대/소문자를 구분합니다. 그러나 따옴표 없이 CREATE TASK 를 사용하여 생성한 작업은 대문자로 저장 및 확인 됩니다. 이를 관리하려면 다음 중 하나를 수행하면 됩니다.

    • 대문자만 사용하여 작업 이름을 생성합니다.

    • 작업의 이름을 지정하고 호출할 때는 따옴표를 사용합니다.

    • 소문자로 정의된 작업 이름의 경우 대문자를 사용하여 작업을 호출하십시오. 예를 들어, “CREATE TASK task_c…”로 정의된 작업은 SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE(‘TASK_C’)로 호출할 수 있습니다.

작업 그래프에 구성 정보 전달하기

작업 그래프에서 다른 작업에서 읽을 수 있는 JSON 오브젝트를 사용하여 구성 정보를 전달할 수 있습니다. CREATE/ALTER TASK … CONFIG 구문을 사용하여 루트 작업의 구성 정보를 설정, 설정 해제 또는 수정했습니다. SYSTEM$GET_TASK_GRAPH_CONFIG 함수를 사용하여 검색하십시오. 예:

CREATE OR REPLACE TASK "task_root"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS SELECT 1;

CREATE OR REPLACE TASK "task_a"
  USER_TASK_TIMEOUT_MS = 600000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
Copy

작업 간에 반환 값 전달하기

작업 그래프에서 작업 간에 반환 값을 전달할 수 있습니다. 작업에서 반환 값을 추가하려면 SYSTEM$SET_RETURN_VALUE 함수를 사용하고, 검색하려면 SYSTEM$GET_PREDECESSOR_RETURN_VALUE 함수를 사용합니다.

작업에 여러 개의 선행 작업이 있는 경우 원하는 반환 값을 가진 작업을 지정해야 합니다. 다음 예제에서는 작업 그래프에 구성 정보를 추가하는 루트 작업을 생성합니다.

CREATE OR REPLACE TASK "task_c"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
    END;

CREATE OR REPLACE TASK "task_d"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_c"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
    END;
Copy

런타임 정보 가져오기 및 사용

SYSTEM$TASK_RUNTIME_INFO 함수를 사용하여 현재 실행 중인 작업에 대한 정보를 보고하십시오. 이 함수에는 작업 그래프와 관련된 몇 가지 옵션이 있습니다. 예를 들어, CURRENT_ROOT_TASK_NAME 을 사용하여 현재 작업 그래프에서 루트 작업의 이름을 가져옵니다. 다음 예제는 작업 그래프의 루트 작업이 시작된 시점을 기준으로 테이블에 날짜 스탬프를 추가하는 방법을 보여줍니다.

-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO date_time_table VALUES('order_date',:value);
    END;
Copy

예제: 여러 작업 시작 및 상태 보고하기

다음 예제에서는 루트 작업이 세 개의 서로 다른 테이블을 업데이트하는 작업을 시작합니다. 이 세 테이블이 업데이트되면 작업은 다른 세 테이블의 정보를 종합 판매 테이블로 결합합니다.

플로차트는 각각 테이블을 업데이트하는 세 개의 하위 작업을 시작하는 루트 작업을 보여줍니다. 이 3가지 작업은 모두 이전 변경 사항을 다른 테이블에 결합하는 또 다른 하위 작업 앞에 있습니다.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
    END;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT customer_id FROM ref_cust_table
        WHERE cust_name = "Jane Doe";);
      INSERT INTO customer_table VALUES('customer_id',:value);
    END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT product_id FROM ref_item_table
        WHERE PRODUCT_NAME = "widget";);
      INSERT INTO product_table VALUES('product_id',:value);
    END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO "date_time_table" VALUES('order_date',:value);
    END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_customer_table, task_product_table, task_date_time_table
  AS
    BEGIN
      LET VALUE := (SELECT sales_order_id FROM ORDERS);
      JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
      INSERT INTO sales_table VALUES('sales_order_id',:value);
    END;
;
Copy

파이널라이저 작업 예시: 이메일 알림 보내기

이 예는 파이널라이저 작업에서 작업 그래프의 성능을 요약한 이메일을 보내는 방법을 보여줍니다. 하나는 작업 완료 상태에 대한 정보를 집계하고, 다른 하나는 이 정보를 사용하여 원격 메시지 서비스를 통해 보낼 수 있는 이메일을 작성하는 2개의 외부 함수를 호출합니다.

작업 시퀀스는 2개의 하위 작업을 가리키는 루트 작업을 보여주며, 이 작업은 차례로 다른 작업을 가리킵니다. 마무리 작업은 다른 모든 작업이 완료되거나 완료되지 않은 후에 실행되며 하단에 표시됩니다.
CREATE OR REPLACE TASK notify_finalizer
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_root
AS
  DECLARE
    my_root_task_id STRING;
    my_start_time TIMESTAMP_LTZ;
    summary_json STRING;
    summary_html STRING;
  BEGIN
    --- Get root task ID
    my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
    --- Get root task scheduled time
    my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
    --- Combine all task run info into one JSON string
    summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
    --- Convert JSON into HTML table
    summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));

    --- Send HTML to email
    CALL SYSTEM$SEND_EMAIL(
        'email_notification',
        'admin@snowflake.com',
        'notification task run summary',
        :summary_html,
        'text/html');
    --- Set return value for finalizer
    CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
  END

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
  RETURNS STRING
AS
$$
  (SELECT
    ARRAY_AGG(OBJECT_CONSTRUCT(
      'task_name', name,
      'run_status', state,
      'return_value', return_value,
      'started', query_start_time,
      'duration', duration,
      'error_message', error_message
      )
    ) AS GRAPH_RUN_SUMMARY
  FROM
    (SELECT
      NAME,
      CASE
        WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
        WHEN STATE = 'FAILED' then '🔴 Failed'
        WHEN STATE = 'SKIPPED' then '🔵 Skipped'
        WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
      END AS STATE,
      RETURN_VALUE,
      TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
      CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
        ' s') AS DURATION,
      ERROR_MESSAGE
    FROM
      TABLE(my-database.information_schema.task_history(
        ROOT_TASK_ID => my_root_task_id ::STRING,
        SCHEDULED_TIME_RANGE_START => my_start_time,
        SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
    ORDER BY
      SCHEDULED_TIME)
  )::STRING
$$
;

CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
  IMPORT JSON

  def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

  DATA = json.loads(JSON_DATA)
  HTML = f"""
    <img src="https://example.com/logo.jpg"
      alt="Company logo" height="72">
    <p><strong>Task Graph Run Summary</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
        """
        headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
        for i, header in enumerate(headers):
            HTML += f'<th scope="col" style="text-align:left;
            width: {column_widths[i]}">{header.capitalize()}</th>'

        HTML +="""
        </tr>
      </thead>
      <tbody>
        """
        for ROW_DATA in DATA:
          HTML += "<tr>"
          for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
          HTML += "</tr>"
        HTML +="""
      </tbody>
    </table>
    """
  return HTML
$$
;
Copy

파이널라이저 작업 예제: 오류 수정

이 예제는 파이널라이저 작업으로 오류를 수정하는 방법을 보여줍니다.

데모 목적으로 작업은 처음 실행하는 동안 실패하도록 설계되었습니다. 파이널라이저 작업은 문제를 수정하고 작업을 다시 시작하여 다음 실행에서 성공합니다.

작업 시리즈를 보여주는 다이어그램. 작업 A는 왼쪽 상단에 표시됩니다. 화살표는 작업 A에서 바로 작업 B를 가리키고, 이 화살표는 작업 C를 가리키며, 이 화살표는 작업 D를 가리킵니다. 작업 A 아래에서 화살표는 마무리 작업인 작업 F를 가리킵니다.
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations.
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively three times, suspend the task.
--    * Other environment values are set.

CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task a successful');
    END;
;

-- 2. Use a runtime reflection variable.
--    Creates a child task ("task_b").
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO demo_table VALUES('task b name',:VALUE);
    END;
;

-- 3. Get a task graph configuration value.
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_b
  AS
    BEGIN
      CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_c
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
      INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
    END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: task_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_a
  AS
    BEGIN
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
    END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
  FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
  LIMIT 5;
;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     After the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy