Tarefas acionadas¶
Use tarefas acionadas para executar tarefas sempre que houver uma alteração em um fluxo. Isso elimina a necessidade de pesquisar uma fonte com frequência quando a disponibilidade de novos dados é imprevisível. Elas também reduzem a latência porque os dados são processados imediatamente.
As tarefas acionadas não usam recursos de computação até que o evento seja acionado.
Considerações¶
As tarefas acionadas são aceitas com os seguintes itens:
Tabelas
Exibições
Tabelas dinâmicas
Tabelas Apache Iceberg™ (gerenciadas e não gerenciadas)
Compartilhamentos de dados
Tabelas de diretório. Uma tabela de diretório deve ser atualizada antes que uma tarefa acionada possa detectar as alterações. Para detectar alterações, você pode executar uma das seguintes tarefas:
Configure a tabela de diretório para atualizar automaticamente.
Atualize a tabela de diretórios manualmente usando o comando ALTER STAGE name REFRESH.
Tarefas acionadas não são compatíveis com os seguintes itens:
Tabelas híbridas
Fluxos em tabelas externas
Para que os consumidores criem fluxos em tabelas compartilhadas ou exibições seguras, o provedor de dados deve ativar o rastreamento de alterações nas tabelas e exibições que devem ser compartilhadas em sua conta; ou seja, ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;
. Sem o rastreamento de alterações ativado, os consumidores não podem criar fluxos nos dados compartilhados. Para obter mais informações, consulte Fluxos em objetos compartilhados.
Criar uma tarefa acionada¶
Use CREATE TASK e defina os seguintes parâmetros:
Defina o fluxo de destino usando a cláusula
WHEN
. (Não inclua o parâmetroSCHEDULE
.)Requisitos adicionais com base em recursos de computação:
Para criar uma tarefa que seja executada em um warehouse gerenciado pelo usuário, inclua o parâmetro
WAREHOUSE
e defina o warehouse.Para criar uma tarefa sem servidor, você deve incluir o parâmetro
TARGET_COMPLETION_INTERVAL
. Não inclua o parâmetroWAREHOUSE
. O Snowflake estima os recursos necessários usando o intervalo de conclusão do destino e se ajusta para concluir a tarefa nesse tempo.
O exemplo a seguir cria uma tarefa acionada sem servidor que é executada sempre que os dados são alterados em um fluxo.
CREATE TASK my_triggered_task
TARGET_COMPLETION_INTERVAL='15 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Migrar uma tarefa existente de uma tarefa agendada para uma tarefa acionada¶
Suspenda a tarefa.
Use ALTER TASK para atualizar a tarefa. Desative o parâmetro
SCHEDULE
e adicione a cláusulaWHEN
para definir o fluxo de destino.Retome a tarefa.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Migrar uma tarefa acionada gerenciada pelo usuário existente para uma tarefa acionada sem servidor¶
Suspenda a tarefa.
Use ALTER TASK para atualizar a tarefa. Remova o parâmetro
WAREHOUSE
e, em seguida, defina o parâmetroTARGET_COMPLETION_INTERVAL
.Retome a tarefa.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Para obter mais informações, consulte tarefas sem servidor.
Permitir que uma tarefa acionada seja executada¶
Quando você cria uma tarefa acionada, ela é iniciada no estado suspenso.
Para começar a monitorar o fluxo:
Retome a tarefa usando ALTER TASK … RESUME.
A tarefa é executada nas seguintes condições:
Quando você retoma pela primeira vez uma tarefa acionada, ela verifica se há alterações no fluxo após a última tarefa ter sido executada. Se houver uma alteração, a tarefa será executada; caso contrário, a tarefa será ignorada sem usar recursos de computação.
Se uma tarefa estiver em execução e o fluxo tiver novos dados, ela será pausada até que a tarefa atual seja concluída. O Snowflake garante que apenas uma instância de uma tarefa seja executada por vez.
Após a conclusão de uma tarefa, o Snowflake verifica novamente se há alterações no fluxo. Se houver alterações, a tarefa será executada novamente; caso contrário, a tarefa será ignorada.
A tarefa é executada sempre que novos dados são detectados no fluxo.
Se os dados do fluxo estiverem hospedados em uma tabela de diretórios, você detectará alterações executando uma das seguintes tarefas:
A cada 12 horas, a tarefa executa uma verificação de integridade para evitar que os fluxos se tornem obsoletos. Se não houver alterações, o Snowflake pulará a tarefa sem usar recursos de computação. Para fluxos, as instruções de tarefa devem consumir os dados no fluxo antes que a retenção de dados expire; caso contrário, o fluxo se torna obsoleto. Para obter mais informações, consulte Como evitar a desatualização de um fluxo.
Por padrão, as tarefas acionadas são executadas no máximo a cada 30 segundos. É possível modificar o parâmetro USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS para executá-las com mais frequência, até a cada 10 segundos.
Quando uma tarefa é acionada por Streams on Views, qualquer alteração nas tabelas referenciadas pela consulta Streams on Views (Fluxos em visualizações) também acionará a tarefa, independentemente de quaisquer uniões, agregações ou filtros na consulta.
Monitoramento de tarefas acionadas¶
Na saída
SHOW TASKS
eDESC TASK
, a propriedadeSCHEDULE
exibeNULL
para tarefas acionadas.Na saída da exibição task_history dos esquemas information_schema e account_usage, a coluna SCHEDULED_FROM exibe TRIGGER.
Exemplos¶
Exemplo 1: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em um dos dois fluxos.
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Exemplo 2: crie uma tarefa gerenciada pelo usuário para ser executada sempre que alterações de dados forem detectadas em dois fluxos de dados diferentes. Como a tarefa usa a condicional AND, ela será ignorada se apenas um dos dois fluxos tiver novos dados.
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
Exemplo 3: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em uma tabela de diretórios. No exemplo, um fluxo — my_directory_table_stream — está hospedado em uma tabela de diretórios em uma área chamada my_test_stage.
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
Para validar a tarefa acionada, os dados são adicionados ao estágio.
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
A tabela de diretórios é então atualizada manualmente, o que aciona a tarefa.
ALTER STAGE my_test_stage REFRESH