Reator de tarefa¶
Biblioteca que fornece elementos e recursos comuns usados em todos os conectores Snowflake.
Requisitos¶
O reator de tarefas exige que pelo menos os seguintes arquivos sql sejam executados durante a instalação do Native App:
task_reactor.sql
(consulte: Referência SQL do reator de tarefas)
Visão geral¶
O reator de tarefas é um módulo separado que fornece um mecanismo de orquestração para partes de trabalho armazenados dentro de uma fila com um conjunto limitado de tarefas. A fila e o dispatcher dos reatores de tarefas são baseados em fluxos do Snowflake com tarefas do Snowflake e serão acionados a cada minuto devido à limitação do tempo de atualização. O reator de tarefas ficará ativo somente quando houver dados na fila de entrada, para permitir que o warehouse economize alguns créditos.
O reator de tarefas consiste em três componentes principais: fila, dispatcher e trabalhadores:
Seu aplicativo do conector adiciona QueueItems à fila.
A cada minuto o dispatcher (uma tarefa Snowflake) busca QueueItems em espera da fila e os passa para os trabalhadores.
A cada minuto, os trabalhadores (tarefas Snowflake) trabalham em paralelo em QueueItems atribuídas.
Uma vez finalizada a configuração do conector, a configuração do reator de tarefas é limitada a 3 etapas:
Criação de todos os componentes do reator de tarefas
Inicialização de instâncias
Alteração de número de trabalhadores (opcional)
Criação de todos os componentes do reator de tarefas¶
Para criar um objeto de instância, o usuário primeiro precisa criar implementações worker
, selector
e opcionalmente expired selector
e, em seguida, integrá-las usando o procedimento TASK_REACTOR.CREATE_INSTANCE_OBJECTS.
Implementação do trabalhador¶
O trabalhador é responsável por executar uma tarefa atribuída pelo dispatcher, como extrair e ingerir determinados dados. A única parte obrigatória é ter um método de trabalho específico que inicie o trabalho. Este método deve ser chamado a partir do procedimento Snowpark, retornar uma cadeia de caracteres e conter os seguintes parâmetros:
session
- Objeto de sessão do Snowparkworker_id
- Número, identificação única do trabalhadortask_reactor_schema
- Nome do esquema onde os objetos do reator de tarefas são criados. Pode ser usado como um nome de instância do reator de tarefas.
O trabalhador é responsável por executar a tarefa atribuída pelo dispatcher, por exemplo, extrair e ingerir dados específicos. Recomendamos usar as classes Java (com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker
e com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion
) ou para tarefas mais simples (com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker
e com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask
), no entanto, seu trabalhador pode ser criado em qualquer linguagem de programação suportada para escrever manipuladores de procedimentos armazenados.
Exemplo de método do trabalhador de Java:
public static String executeWork(Session session, int workerId, String taskReactorSchema) {
FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
WorkerId workerIdentifier = new WorkerId(workerId);
Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
try {
IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
} catch (WorkerException e) {
// handle the exception...
throw new RuntimeException(e);
}
return "Worker procedure executed.";
}
Com um método do trabalhador já criado, o usuário deve integrá-lo ao CONNECTOR.WORKER_PROCEDURE
. O procedimento deve chamar seu próprio método do trabalhador. Ele deve ser criado em seu esquema de aplicativo, retornar uma STRING e conter os seguintes parâmetros:
worker_id
- númerotask_reactor_schema
- cadeia de caracteres
Um procedimento de exemplo, chamando a implementação Java do trabalhador:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
RETURNS STRING
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
A biblioteca de telemetria é necessária para coletar métricas que são registradas na tabela de eventos.
Implementação do seletor¶
O trabalho do seletor é decidir quais tarefas enfileiradas devem ser manipuladas pelo reator de tarefas. Semelhante à implementação do trabalhador, ele pode ser criado em qualquer linguagem suportada pelo Snowpark. O seletor de tarefas pode ser implementado como um procedimento de banco de dados ou uma exibição de banco de dados. O seletor (procedimento ou exibição) deve ser passado como argumento no procedimento TASK_REACTOR.CREATE_NEW_INSTANCE
.
O procedimento deve ser chamável de um procedimento Snowpark, retornar uma cadeia de caracteres e conter os seguintes parâmetros:
session
- Sessão de SnowparkqueueItems
- Cadeias de caracteres[] (uma matriz de cadeias de caracteres JSON individuais, cada uma descrevendo um único QueueItem)
Exemplo de método do seletor de Java:
public static String selectWork(Session session, String[] queueItems) {
Variant[] sorted =
Arrays.stream(queueItems)
.map(Variant::new)
.filter(
queueItem ->
!queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
.sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
.toArray(Variant[]::new);
return new Variant(sorted).asJsonString();
}
Em vez do método seletor, ainda é possível criar uma exibição que filtrará e classificará tarefas da fila existente. O dispatcher pode recuperar novas tarefas da exibição recém-criada usando uma consulta de exemplo:
CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Com o método do seletor já criado, o usuário deve integrá-lo ao CONNECTOR.WORK_SELECTOR
. O procedimento deve chamar seu método seletor do trabalhador obrigatório. Ele deve ser criado no esquema de seu aplicativo, retornar uma ARRAY, e contém o seguinte parâmetro:
work_items - array
Um procedimento de exemplo, chamando a implementação Java do seletor do trabalhador:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
RETURNS ARRAY
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Implementação do seletor expirado¶
A função do seletor expirado é decidir quais itens na fila devem ser removidos da fila do reator de tarefas. Pode ser necessário remover itens porque o seletor nunca consegue alcançar alguns itens e esses itens permaneceriam na fila para sempre. Além disso, alguns itens que estão esperando na fila podem ser criados muito tempo antes e não faz mais sentido processá-los. O seletor expirado pode ser implementado como uma exibição de banco de dados. A exibição do seletor deve ser passada como um argumento no procedimento TASK_REACTOR.CREATE_NEW_INSTANCE
. Se não houver necessidade de remover itens da fila, a implementação padrão pode ser usada TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
.
Usando a consulta a seguir, é possível criar uma exibição de seletor expirado que seleciona os itens que foram criados há mais de 3 dias:
CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
AS SELECT * FROM TASK_REACTOR.QUEUE q
WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Integração de objetos de instância¶
O TASK_REACTOR.CREATE_INSTANCE_OBJECTS permite que o usuário configure todas as instâncias juntas antes de inicializar as instâncias criadas. O procedimento pode ser executado apenas uma vez por esquema, portanto, quaisquer chamadas futuras não afetarão nenhuma alteração. Recomendamos colocar a chamada de inicialização para o arquivo setup.sql
, para evitar que o procedimento seja executado várias vezes ou nem seja chamado.
Parâmetros obrigatórios:
instance_schema_name VARCHAR
- Um esquema exclusivo por instância que armazena objetos de banco de dados nos quais a instância trabalha.worker_procedure_name VARCHAR
- Nome do procedimento do trabalhador descrito na parteWorker Implementation
.work_selector_type VARCHAR
- Valores especificando se novas tarefas devem usar exibição ou procedimento. Possíveis valores: VIEW, PROCEDURE.work_selector_name VARCHAR
- Nome do procedimento/exibição do seletor descrito na funçãoSelector Implementation
.
Parâmetros opcionais:
expired_work_selector_name VARCHAR
- Nome da exibição do seletor expirado descrita na funçãoExpired Selector Implementation
. Se o valor não for fornecido,TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
será usado como uma implementação padrão que não retorna nada.
Inicialização de instâncias¶
Para inicializar e executar todas as configurações no reator de tarefas, o usuário deve chamar INITIALIZE_INSTANCE
. O procedimento recebe os seguintes parâmetros como entrada:
instance_schema_name
- (obrigatório) Nome do esquema que armazena objetos de banco de dados nos quais a instância trabalha.warehouse_name
(obrigatório) Nome do warehouse no qual a instância será executada.dt_should_be_started
(opcional) - padrão:TRUE
. A tarefa do dispatcher deve iniciar ao criar uma nova instância ou não.dt_task_schedule
(opcional) - padrão:1 MINUTE
. Frequência de execução da tarefa do dispatcher.dt_allow_overlapping_execution
(opcional) - padrão:FALSE
. Permite que a DAG seja executada simultaneamente.dt_user_task_timeout_ms
(opcional) - Especifica o limite de tempo em uma única execução da tarefa antes que ele se esgote (em milissegundos).
Nota
Se o procedimento do trabalhador demorar mais do que o tempo limite definido na tarefa do trabalhador (USER_TASK_TIMEOUT_MS), então o procedimento será abortado com um erro de tempo limite. É importante agendar tarefas para não exceder o tempo limite da tarefa Snowflake.
Após fornecer o número mínimo de parâmetros necessários, o Task Reactor
é inicializado com a configuração fornecida e despacha os trabalhadores usando o procedimento TASK_REACTOR.DISPATCHER
.
Definição do número de trabalhadores¶
O número de trabalhadores pode ser alterado manualmente chamando o procedimento TASK_REACTOR.SET_WORKERS_NUMBER com os seguintes parâmetros:
WORKERS_NUMBER
- novo número de trabalhadores.TR_INSTANCE_SCHEMA_NAME
- nome do esquema de instância
Métricas¶
O reator de tarefas contém um mecanismo de métricas. Ele baseia-se nos eventos de rastreamento do Snowflake. As métricas são registradas na tabela de eventos, portanto, a tabela de eventos precisa ser habilitada para que as métricas funcionem.
Atualmente, as seguintes métricas são introduzidas:
worker working time
(TASK_REACTOR_WORKER_WORKING_TIME
) - Mostra o tempo em que um trabalhador estava realmente processando recursos. O cronômetro inicia quando uma tarefa do trabalhador começa e termina quando a tarefa do trabalhador termina.worker idle time
(TASK_REACTOR_WORKER_IDLE_TIME
) - É o oposto doworker working time
. Ele mostra o tempo em que um trabalhador estava em espera: esperando por um novo trabalho ou esperando a próxima programação de sua tarefa. O cronômetro começa quando um trabalhador termina sua tarefa e termina quando a tarefa do trabalhador começa novamente.
Para ver todos os eventos de métricas registrados, a seguinte consulta pode ser usada:
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
ORDER BY event.timestamp DESC;
Para selecionar apenas um tipo de métrica, adicione event.record:name = <nome da métrica>
à cláusula where
da consulta.
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
ORDER BY event.timestamp DESC;