Task Reactor

모든 Snowflake 커넥터에 사용되는 공통 요소와 기능을 제공하는 라이브러리입니다.

요구 사항

Task Reactor는 Native App 설치 중에 최소한 다음의 SQL 파일을 실행해야 합니다.

개요

Task Reactor는 제한된 작업 집합이 포함된 큐 내부에 저장된 작업 청크에 대한 오케스트레이션 메커니즘을 제공하는 별도의 모듈입니다. Task Reactors의 큐와 디스패처는 Snowflake TasksSnowflake Streams 을 기반으로 하며 새로 고침 시간 제한으로 인해 1분마다 트리거됩니다. Task Reactor는 입력 큐에 데이터가 있는 경우에만 활성화되어 웨어하우스가 일부 크레딧을 절약할 수 있도록 합니다.

Task Reactor는 큐, 디스패처, 워커의 세 가지 주요 구성 요소로 구성됩니다.

  1. 커넥터 애플리케이션이 큐에 QueueItems를 추가합니다.

  2. 디스패처(Snowflake 작업)는 매분마다 큐에서 대기 중인 QueueItems를 가져와 워커에 전달합니다.

  3. 매분마다 워커(Snowflake 작업)가 할당된 QueueItems에서 병렬로 작업합니다.

커넥터 구성이 완료되면 Task Reactor 구성은 3단계로 제한됩니다.

  1. Task Reactor의 모든 구성 요소 만들기

  2. 인스턴스 초기화하기

  3. (선택 사항) 워커 번호 변경하기

Task Reactor의 모든 구성 요소 만들기

인스턴스 오브젝트를 생성하려면 먼저 worker, selector 및 선택적으로 expired selector 구현을 생성한 다음 TASK_REACTOR.CREATE_INSTANCE_OBJECTS 프로시저를 사용하여 이를 통합해야 합니다.

워커 구현

워커는 디스패처가 할당한 작업, 예를 들어 특정 데이터를 끌어오고 수집하는 작업을 수행할 책임이 있습니다. 유일한 필수 사항은 작업을 시작하는 특정 워커 메서드가 있어야 한다는 것입니다. 이 메서드는 Snowpark 프로시저에서 호출 가능해야 하고 문자열을 반환하며 다음 매개 변수를 포함해야 합니다.

  • session - Snowpark 세션 오브젝트

  • worker_id - 숫자, 고유 워커 ID

  • task_reactor_schema - Task Reactor 오브젝트가 생성되는 스키마 이름입니다. Task Reactor 인스턴스의 이름으로 사용될 수 있습니다.

워커는 디스패처가 할당한 작업, 즉 특정 데이터를 가져오고 수집하는 작업을 실행할 책임이 있습니다. (com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorkercom.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion) Java 클래스 또는 간단한 작업(com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorkercom.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask)을 사용하는 것이 좋지만, 워커는 저장 프로시저 핸들러를 작성하는 데 지원되는 모든 프로그래밍 언어로 만들 수 있습니다.

Java 워커 메서드의 예:

public static String executeWork(Session session, int workerId, String taskReactorSchema) {
  FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
  WorkerId workerIdentifier = new WorkerId(workerId);
  Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
  try {
    IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
  } catch (WorkerException e) {
    // handle the exception...
    throw new RuntimeException(e);
  }
  return "Worker procedure executed.";
}
Copy

이미 생성된 워커 메서드가 있는 경우 사용자는 이를 CONNECTOR.WORKER_PROCEDURE 에 통합해야 합니다. 해당 프로시저는 자체 워커 메서드를 호출해야 합니다. 애플리케이션 스키마에 생성되어야 하며 STRING을 반환하고 다음 매개 변수를 포함해야 합니다.

  • worker_id - 숫자

  • task_reactor_schema - 문자열

워커의 Java 구현을 호출하는 예제 프로시저:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
Copy

원격 측정 라이브러리는 이벤트 테이블에 기록되는 메트릭을 수집하는 데 필요합니다.

선택기 구현

선택기의 역할은 대기 중인 작업 중 어떤 작업을 Task Reactor에서 처리해야 하는지 결정하는 것입니다. 워커 구현과 유사하며, Snowpark에서 지원하는 모든 언어로 만들 수 있습니다. 작업 선택기는 데이터베이스 프로시저나 데이터베이스 뷰로 구현될 수 있습니다. 선택기(프로시저 또는 뷰)는 TASK_REACTOR.CREATE_NEW_INSTANCE 프로시저에서 인자로 전달되어야 합니다.

프로시저는 Snowpark 프로시저에서 호출 가능해야 하고 문자열을 반환하며 다음 매개 변수를 포함해야 합니다.

  • session - Snowpark 세션

  • queueItems - String[](각각 하나의 QueueItem을 설명하는 개별 JSON 문자열의 배열)

Java 선택기 방법의 예:

public static String selectWork(Session session, String[] queueItems) {
  Variant[] sorted =
      Arrays.stream(queueItems)
          .map(Variant::new)
          .filter(
              queueItem ->
                  !queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
          .sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
          .toArray(Variant[]::new);
  return new Variant(sorted).asJsonString();
}
Copy

선택기 방법 대신 기존 큐에서 작업을 필터링하고 정렬하는 뷰를 만드는 것도 가능합니다. 디스패처는 예제 쿼리를 사용하여 새로 생성된 뷰에서 새 작업을 검색할 수 있습니다.

CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Copy

이미 생성한 셀렉터 메서드가 있는 경우 CONNECTOR.WORK_SELECTOR 에 통합해야 합니다. 이 프로시저에서는 필수 작업 선택기 메서드를 호출해야 합니다. 애플리케이션 스키마에 생성되어야 하며 ARRAY를 반환하고 다음 매개 변수를 포함해야 합니다.

  • work_items - array

작업 선택기의 Java 구현을 호출하는 예제 프로시저:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
    RETURNS ARRAY
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Copy

만료된 선택기 구현

만료된 선택기의 역할은 Task Reactor 큐에서 어떤 대기 항목을 제거해야 할지 결정하는 것입니다. 선택기가 일부 항목에 도달하지 못하면 해당 항목이 큐에 영구 유지되므로 항목을 제거해야 할 수 있습니다. 게다가, 큐에서 대기 중인 일부 항목은 이미 오래 전에 생성되어 더 이상 처리하는 것이 의미가 없는 경우가 있습니다. 만료된 선택기는 데이터베이스 뷰로 구현될 수 있습니다. 선택기 뷰는 TASK_REACTOR.CREATE_NEW_INSTANCE 프로시저에서 인자로 전달되어야 합니다. 큐에서 항목을 제거할 필요가 없는 경우 기본 구현 TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR 를 사용할 수 있습니다.

다음 쿼리를 사용하면 3일 이상 전에 생성된 항목을 선택하는 만료된 선택기 뷰를 만들 수 있습니다.

CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
    AS SELECT * FROM TASK_REACTOR.QUEUE q
        WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Copy

인스턴스 오브젝트 통합

TASK_REACTOR.CREATE_INSTANCE_OBJECTS 를 사용하면 생성된 인스턴스를 초기화하기 전에 모든 인스턴스를 함께 구성할 수 있습니다. 해당 프로시저는 스키마당 한 번만 실행될 수 있으므로 이후의 호출은 어떠한 변경에도 영향을 미치지 않습니다. 프로시저가 여러 번 실행되거나 아예 호출되지 않도록 초기화 호출을 setup.sql 파일에 배치하는 것이 좋습니다.

필수 매개 변수:

  • instance_schema_name VARCHAR - 인스턴스가 작업하는 데이터베이스 오브젝트를 저장하는 인스턴스당 하나의 고유한 스키마입니다.

  • worker_procedure_name VARCHAR - Worker Implementation 부분에 설명되는 워커 프로시저의 이름입니다.

  • work_selector_type VARCHAR - 새로운 작업이 뷰나 프로시저를 사용해야 하는지 지정하는 값입니다. 가능한 값: VIEW, PROCEDURE.

  • work_selector_name VARCHAR - Selector Implementation 부분에 설명되는 선택기 프로시저/뷰의 이름입니다.

선택적 매개 변수:

  • expired_work_selector_name VARCHAR - Expired Selector Implementation 부분에 설명되는 만료된 선택기 뷰의 이름입니다. 값을 제공되지 않으면 TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR 가 기본 구현으로 사용되어 아무것도 반환하지 않습니다.

인스턴스 초기화하기

Task Reactor에서 모든 구성을 초기화하고 실행하려면 사용자가 INITIALIZE_INSTANCE 를 호출해야 합니다. 이 프로시저는 다음 매개 변수를 입력으로 사용합니다.

  • instance_schema_name - (필수) 인스턴스가 동작하는 데이터베이스 오브젝트를 저장하는 스키마의 이름입니다.

  • warehouse_name - (필수) 인스턴스가 실행될 웨어하우스의 이름입니다.

  • dt_should_be_started (선택 사항) - 기본값: TRUE. 새 인스턴스를 만들 때 디스패처 작업이 시작되어야 하는지 여부입니다.

  • dt_task_schedule (선택 사항) - 기본값: 1 MINUTE. 디스패처 작업을 실행하는 빈도입니다.

  • dt_allow_overlapping_execution (선택 사항) - 기본값: FALSE. DAG를 동시에 실행할 수 있도록 허용합니다.

  • dt_user_task_timeout_ms (선택 사항) - 시간이 초과되기 전에 작업의 단일 실행에 대한 시간 제한(밀리초)을 지정합니다.

참고

워커 프로시저가 워커 작업에 설정된 시간 제한보다 오래 걸리는 경우(USER_TASK_TIMEOUT_MS), 그러면 프로시저가 시간 제한 오류와 함께 중단됩니다. Snowflake 작업의 시간 제한을 초과하지 않도록 작업을 예약하는 것이 중요합니다.

필요한 최소 매개 변수를 제공하면 Task Reactor 는 제공된 구성으로 초기화되고 TASK_REACTOR.DISPATCHER 프로시저를 사용하여 워커를 디스패치합니다.

워커 수 설정

워커 수는 다음 매개 변수를 사용하여 TASK_REACTOR.SET_WORKERS_NUMBER 프로시저를 호출하여 수동으로 변경할 수 있습니다.

  • WORKERS_NUMBER - 새로운 워커 수입니다.

  • TR_INSTANCE_SCHEMA_NAME - 인스턴스 스키마의 이름입니다.

메트릭

Task Reactor에는 메트릭 메커니즘이 포함되어 있습니다. 그리고 Snowflake 추적 이벤트 를 기반으로 합니다. 메트릭은 이벤트 테이블에 기록되므로, 메트릭을 작동시키려면 이벤트 테이블을 활성화해야 합니다.

현재 다음과 같은 메트릭이 도입되었습니다.

  • worker working time (TASK_REACTOR_WORKER_WORKING_TIME) - 워커가 실제로 리소스를 처리하는 시간을 보여줍니다. 타이머는 워커 작업이 시작될 때 시작되고 워커 작업이 완료되면 끝납니다.

  • worker idle time (TASK_REACTOR_WORKER_IDLE_TIME) - worker working time 의 반대입니다. 워커가 대기하는 시간, 즉 새 작업을 기다리거나 다음 작업 일정을 기다리는 시간을 표시합니다. 타이머는 워커가 작업을 완료하면 시작되고, 워커 작업이 다시 시작되면 끝납니다.

기록된 모든 메트릭 이벤트를 보려면 다음 쿼리를 사용할 수 있습니다.

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
    ORDER BY event.timestamp DESC;
Copy

한 가지 유형의 메트릭만 선택하려면 쿼리의 where 절에 event.record:name = <메트릭 이름> 을 추가합니다.

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
        AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
    ORDER BY event.timestamp DESC;
Copy