트리거된 작업

트리거된 작업을 사용하여 스트림 에 변경이 있을 때마다 작업을 실행합니다. 이렇게 하면 새로운 데이터의 가용성을 예측할 수 없을 때 소스를 자주 폴링할 필요가 없습니다. 또한 데이터가 즉시 처리되므로 지연 시간도 줄어듭니다.

트리거된 작업은 이벤트가 트리거될 때까지 컴퓨팅 리소스를 사용하지 않습니다.

고려 사항

  • 디렉터리 테이블에서 호스팅되는 스트림의 경우 트리거된 작업이 변경 사항을 감지하기 전에 디렉터리 테이블을 새로 고쳐야 합니다. 변경 사항을 감지하려면 다음 중 하나를 수행하면 됩니다.

  • 외부 테이블 및 하이브리드 테이블의 스트림은 지원되지 않습니다.

트리거된 작업 만들기

CREATE TASK 을 사용하여 다음 매개 변수를 설정합니다.

  • WHEN 절을 사용하여 대상 스트림을 정의합니다. (SCHEDULE 매개 변수는 포함하지 마십시오.)

    여러 데이터 스트림으로 작업하는 경우 조건부 매개 변수 WHEN ... ANDWHEN ... OR 를 사용할 수 있습니다.

  • 컴퓨팅 리소스 에 기반한 추가 요구 사항:

    • 서버리스 작업을 생성하려면 TARGET_COMPLETION_INTERVAL 매개 변수를 포함해야 합니다. WAREHOUSE 매개 변수를 포함하지 마십시오. Snowflake는 대상 완료 간격을 사용하여 필요한 리소스를 추정하고 이 시간 내에 작업을 완료하도록 조정합니다.

    Snowflake에서 서버리스 트리거 작업이 작동하는 방식을 보여주는 다이어그램.
    • 사용자 관리 웨어하우스에서 실행되는 작업을 생성하려면 WAREHOUSE 매개 변수를 포함시키고 웨어하우스를 정의하십시오.

예약된 작업에서 트리거된 작업으로 기존 작업 마이그레이션하기

  1. 작업을 일시 중단합니다.

  2. SCHEDULE 매개 변수를 설정 해제하고 WHEN 절을 추가하여 대상 스트림을 정의합니다.

  3. 작업을 다시 시작합니다.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

기존 사용자 관리 트리거 작업을 서버리스 트리거 작업으로 마이그레이션하기

  1. 작업을 일시 중단합니다.

  2. WAREHOUSE 매개 변수를 제거하고 TARGET_COMPLETION_INTERVAL 매개 변수를 설정합니다.

  3. 작업을 다시 시작합니다.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

자세한 내용은 서버리스 작업 을 참조하십시오.

트리거된 작업이 실행되도록 허용하기

트리거된 작업을 생성하면 일시 중단된 상태에서 시작됩니다.

스트림 모니터링을 시작하려면 다음과 같이 하십시오.

작업은 다음 조건에서 실행됩니다.

  • 트리거된 작업을 처음 다시 시작하면 작업은 스트림에서 마지막 작업이 실행된 이후의 변경 사항을 확인합니다. 변경 사항이 있으면 작업이 실행되고, 그렇지 않으면 컴퓨팅 리소스를 사용하지 않고 작업을 건너뜁니다.

  • 작업이 실행 중이고 스트림에 새로운 데이터가 있는 경우 작업은 현재 작업이 완료될 때까지 대기합니다. Snowflake는 한 번에 1개의 작업 인스턴스만 실행되도록 합니다.

  • 작업이 완료되면 Snowflake는 스트림의 변경 사항을 다시 확인합니다. 변경 사항이 있으면 작업이 다시 실행되고, 변경 사항이 없으면 작업을 건너뜁니다.

  • 이 작업은 스트림에서 새로운 데이터가 감지될 때마다 실행됩니다.

  • 스트림 데이터가 디렉터리 테이블에서 호스팅되는 경우 다음 중 하나를 수행하여 변경 사항을 감지할 수 있습니다.

  • 작업은 12시간마다 스트림이 부실해지는 것을 방지하기 위해 상태 검사를 실행합니다. 변경 사항이 없는 경우 Snowflake는 컴퓨팅 리소스를 사용하지 않고 작업을 건너뜁니다. 스트림의 경우, 작업 지침은 데이터 보존이 만료되기 전에 스트림의 데이터를 소비해야 하며, 그렇지 않으면 스트림이 부실 상태가 됩니다. 자세한 내용은 스트림 부실 방지하기 섹션을 참조하십시오.

  • 기본적으로 트리거 작업은 최대 30초마다 실행됩니다. USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS 매개 변수를 수정하여 최대 10초마다 실행되도록 더 자주 실행할 수 있습니다.

  • Streams on Views 에 의해 작업이 트리거되면 쿼리의 조인, 집계 또는 필터에 관계없이 뷰의 스트림 쿼리에서 참조하는 테이블에 대한 모든 변경 사항도 작업을 트리거합니다.

다이어그램은 트리거된 작업이 새로운 데이터가 들어올 때 어떻게 관리되는지 및 12시간마다 변경 사항이 있는지 확인하는 방법을 보여줍니다.

트리거된 작업 모니터링하기

  • SHOW TASKSDESC TASK 출력에서 SCHEDULE 속성은 트리거된 작업에 대해 NULL 을 표시합니다.

  • information_schema 및 account_usage 스키마에서 task_history 뷰의 출력에서 SCHEDULED_FROM 열에 TRIGGER가 표시됩니다.

예제 1: 스트림에서 데이터가 변경될 때마다 실행되는 서버리스 작업을 생성합니다.

이 작업은 서버리스이므로 TARGET_COMPLETION_INTERVAL 매개 변수는 Snowflake가 필요한 컴퓨팅 리소스를 추정하는 데 필요합니다.

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

예 2: 두 스트림 중 하나에서 데이터가 변경될 때마다 실행되는 사용자 관리 작업을 만듭니다.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

예 3: 서로 다른 두 데이터 스트림에서 데이터 변경이 감지될 때마다 실행되도록 사용자 관리 작업을 생성합니다. 이 작업은 AND 조건문을 사용하므로 두 스트림 중 하나에만 새 데이터가 있는 경우 작업을 건너뜁니다.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

예 4: 디렉터리 테이블에서 데이터가 변경될 때마다 실행되는 사용자 관리 작업을 생성합니다. 이 예제에서는 스트림(my_directory_table_stream)이 스테이지(my_test_stage)의 디렉터리 테이블 에서 호스팅됩니다.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

트리거된 작업의 유효성을 검사하기 위해 데이터가 스테이지에 추가됩니다.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

그런 다음 디렉터리 테이블을 수동으로 새로 고쳐 작업을 트리거합니다.

ALTER STAGE my_test_stage REFRESH
Copy