Tarefas acionadas¶
Use tarefas acionadas para executar tarefas sempre que houver uma alteração em um fluxo. Isso elimina a necessidade de pesquisar uma fonte com frequência quando a disponibilidade de novos dados é imprevisível. Elas também reduzem a latência porque os dados são processados imediatamente.
As tarefas acionadas não usam recursos de computação até que o evento seja acionado.
Considerações¶
Para fluxos hospedados em tabelas de diretório, a tabela de diretório precisa ser atualizada antes que uma tarefa acionada possa detectar as alterações. Para detectar alterações, você pode fazer uma das seguintes opções:
Configure a tabela de diretório para atualizar automaticamente.
Atualize a tabela de diretórios manualmente usando o comando ALTER STAGE name REFRESH.
Não há suporte para fluxos em tabelas externas e tabelas híbridas.
Criar uma tarefa acionada¶
Use CREATE TASK e defina os seguintes parâmetros:
Defina o fluxo de destino usando a cláusula
WHEN
. (Não inclua o parâmetroSCHEDULE
.)Ao trabalhar com vários fluxos de dados, você pode usar parâmetros condicionais:
WHEN ... AND
eWHEN ... OR
.Requisitos adicionais com base em recursos de computação:
Para criar uma tarefa sem servidor, você deve incluir o parâmetro
TARGET_COMPLETION_INTERVAL
. Não inclua o parâmetroWAREHOUSE
. O Snowflake estima os recursos necessários usando o intervalo de conclusão do destino e se ajusta para concluir a tarefa nesse tempo.
Para criar uma tarefa que seja executada em um warehouse gerenciado pelo usuário, inclua o parâmetro
WAREHOUSE
e defina o warehouse.
Migrar uma tarefa existente de uma tarefa agendada para uma tarefa acionada¶
Suspenda a tarefa.
Desmarque o parâmetro
SCHEDULE
e adicione a cláusulaWHEN
para definir o fluxo de destino.Retome a tarefa.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Migrar uma tarefa acionada gerenciada pelo usuário existente para uma tarefa acionada sem servidor¶
Suspenda a tarefa.
Remova o parâmetro
WAREHOUSE
e defina o parâmetroTARGET_COMPLETION_INTERVAL
.Retome a tarefa.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Para obter mais informações, consulte tarefas sem servidor.
Permitir que a tarefa acionada seja executada¶
Quando você cria uma tarefa acionada, ela é iniciada no estado suspenso.
Para começar a monitorar o fluxo:
Retome a tarefa usando ALTER TASK … RESUME.
A tarefa é executada nas seguintes condições:
Quando você retoma uma tarefa acionada pela primeira vez, a tarefa verifica o fluxo em busca de alterações desde que a última tarefa foi executada. Se houver uma alteração, a tarefa é executada; caso contrário, ela é ignorada sem o uso de recursos de computação.
Se uma tarefa estiver em execução e o fluxo tiver novos dados, a tarefa aguardará até que a tarefa atual seja concluída. O Snowflake garante que apenas uma instância de uma tarefa seja executada por vez.
Após a conclusão de uma tarefa, o Snowflake verifica novamente se há alterações no fluxo. Se houver alterações, a tarefa será executada novamente; caso contrário, a tarefa será ignorada.
A tarefa é executada sempre que novos dados são detectados no fluxo.
Se os dados do fluxo estiverem hospedados em uma tabela de diretório, para detectar alterações, você pode fazer o seguinte
A cada 12 horas, a tarefa executa uma verificação de integridade para evitar que os fluxos se tornem obsoletos. Se não houver alterações, o Snowflake pulará a tarefa sem usar recursos de computação. Para fluxos, as instruções de tarefa devem consumir os dados no fluxo antes que a retenção de dados expire; caso contrário, o fluxo se torna obsoleto. Para obter mais informações, consulte Como evitar a desatualização de um fluxo.
Por padrão, as tarefas acionadas são executadas no máximo a cada 30 segundos. É possível modificar o parâmetro USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS para executá-las com mais frequência, até a cada 10 segundos.
Quando uma tarefa é acionada por Streams on Views, qualquer alteração nas tabelas referenciadas pela consulta Streams on Views (Fluxos em visualizações) também acionará a tarefa, independentemente de quaisquer uniões, agregações ou filtros na consulta.
Monitoramento de tarefas acionadas¶
Na saída
SHOW TASKS
eDESC TASK
, a propriedadeSCHEDULE
exibeNULL
para tarefas acionadas.Na saída da exibição task_history dos esquemas information_schema e account_usage, a coluna SCHEDULED_FROM exibe TRIGGER.
Exemplos¶
Exemplo 1: crie uma tarefa sem servidor que seja executada sempre que os dados forem alterados em um fluxo.
Como a tarefa é sem servidor, o parâmetro TARGET_COMPLETION_INTERVAL
é necessário para permitir que o Snowflake estime os recursos de computação necessários.
CREATE TASK my_task
TARGET_COMPLETION_INTERVAL='120 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS SELECT 1;
Exemplo 2: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em qualquer um dos dois fluxos.
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Exemplo 3: crie uma tarefa gerenciada por usuário para ser executada sempre que forem detectadas alterações de dados em dois fluxos de dados diferentes. Como a tarefa usa o condicional AND, ela será ignorada se apenas um dos dois fluxos tiver novos dados.
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
Exemplo 4: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em uma tabela de diretório. No exemplo, um fluxo (my_directory_table_stream) é hospedado em uma tabela de diretório em um estágio (my_test_stage).
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
Para validar a tarefa acionada, os dados são adicionados ao estágio.
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
A tabela de diretórios é então atualizada manualmente, o que aciona a tarefa.
ALTER STAGE my_test_stage REFRESH