Tarefas acionadas

Use tarefas acionadas para executar tarefas sempre que houver uma alteração em um fluxo. Isso elimina a necessidade de pesquisar uma fonte com frequência quando a disponibilidade de novos dados é imprevisível. Elas também reduzem a latência porque os dados são processados imediatamente.

As tarefas acionadas não usam recursos de computação até que o evento seja acionado.

Considerações

As tarefas acionadas são aceitas com os seguintes itens:

  • Tabelas

  • Exibições

  • Tabelas dinâmicas

  • Tabelas Apache Iceberg™ (gerenciadas e não gerenciadas)

  • Compartilhamentos de dados

  • Tabelas de diretório. Uma tabela de diretório deve ser atualizada antes que uma tarefa acionada possa detectar as alterações. Para detectar alterações, você pode executar uma das seguintes tarefas:

Tarefas acionadas não são compatíveis com os seguintes itens:

  • Tabelas híbridas

  • Fluxos em tabelas externas

Para que os consumidores criem fluxos em tabelas compartilhadas ou exibições seguras, o provedor de dados deve ativar o rastreamento de alterações nas tabelas e exibições que devem ser compartilhadas em sua conta; ou seja, ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;. Sem o rastreamento de alterações ativado, os consumidores não podem criar fluxos nos dados compartilhados. Para obter mais informações, consulte Fluxos em objetos compartilhados.

Criar uma tarefa acionada

Use CREATE TASK e defina os seguintes parâmetros:

  • Defina o fluxo de destino usando a cláusula WHEN. (Não inclua o parâmetro SCHEDULE.)

  • Requisitos adicionais com base em recursos de computação:

    • Para criar uma tarefa que seja executada em um warehouse gerenciado pelo usuário, inclua o parâmetro WAREHOUSE e defina o warehouse.

    • Para criar uma tarefa sem servidor, você deve incluir o parâmetro TARGET_COMPLETION_INTERVAL. Não inclua o parâmetro WAREHOUSE. O Snowflake estima os recursos necessários usando o intervalo de conclusão do destino e se ajusta para concluir a tarefa nesse tempo.

O exemplo a seguir cria uma tarefa acionada sem servidor que é executada sempre que os dados são alterados em um fluxo.

Diagrama mostrando uma tarefa acionada sem servidor
CREATE TASK my_triggered_task
  TARGET_COMPLETION_INTERVAL='15 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Migrar uma tarefa existente de uma tarefa agendada para uma tarefa acionada

  1. Suspenda a tarefa.

  2. Use ALTER TASK para atualizar a tarefa. Desative o parâmetro SCHEDULE e adicione a cláusula WHEN para definir o fluxo de destino.

  3. Retome a tarefa.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrar uma tarefa acionada gerenciada pelo usuário existente para uma tarefa acionada sem servidor

  1. Suspenda a tarefa.

  2. Use ALTER TASK para atualizar a tarefa. Remova o parâmetro WAREHOUSE e, em seguida, defina o parâmetro TARGET_COMPLETION_INTERVAL.

  3. Retome a tarefa.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Para obter mais informações, consulte tarefas sem servidor.

Permitir que uma tarefa acionada seja executada

Quando você cria uma tarefa acionada, ela é iniciada no estado suspenso.

Para começar a monitorar o fluxo:

A tarefa é executada nas seguintes condições:

  • Quando você retoma pela primeira vez uma tarefa acionada, ela verifica se há alterações no fluxo após a última tarefa ter sido executada. Se houver uma alteração, a tarefa será executada; caso contrário, a tarefa será ignorada sem usar recursos de computação.

  • Se uma tarefa estiver em execução e o fluxo tiver novos dados, ela será pausada até que a tarefa atual seja concluída. O Snowflake garante que apenas uma instância de uma tarefa seja executada por vez.

  • Após a conclusão de uma tarefa, o Snowflake verifica novamente se há alterações no fluxo. Se houver alterações, a tarefa será executada novamente; caso contrário, a tarefa será ignorada.

  • A tarefa é executada sempre que novos dados são detectados no fluxo.

  • Se os dados do fluxo estiverem hospedados em uma tabela de diretórios, você detectará alterações executando uma das seguintes tarefas:

  • A cada 12 horas, a tarefa executa uma verificação de integridade para evitar que os fluxos se tornem obsoletos. Se não houver alterações, o Snowflake pulará a tarefa sem usar recursos de computação. Para fluxos, as instruções de tarefa devem consumir os dados no fluxo antes que a retenção de dados expire; caso contrário, o fluxo se torna obsoleto. Para obter mais informações, consulte Como evitar a desatualização de um fluxo.

  • Por padrão, as tarefas acionadas são executadas no máximo a cada 30 segundos. É possível modificar o parâmetro USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS para executá-las com mais frequência, até a cada 10 segundos.

  • Quando uma tarefa é acionada por Streams on Views, qualquer alteração nas tabelas referenciadas pela consulta Streams on Views (Fluxos em visualizações) também acionará a tarefa, independentemente de quaisquer uniões, agregações ou filtros na consulta.

Um diagrama mostra como as tarefas acionadas gerenciam novos dados à medida que eles chegam e também verificam as alterações a cada 12 horas.

Monitoramento de tarefas acionadas

  • Na saída SHOW TASKS e DESC TASK, a propriedade SCHEDULE exibe NULL para tarefas acionadas.

  • Na saída da exibição task_history dos esquemas information_schema e account_usage, a coluna SCHEDULED_FROM exibe TRIGGER.

Exemplos

Exemplo 1: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em um dos dois fluxos.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Exemplo 2: crie uma tarefa gerenciada pelo usuário para ser executada sempre que alterações de dados forem detectadas em dois fluxos de dados diferentes. Como a tarefa usa a condicional AND, ela será ignorada se apenas um dos dois fluxos tiver novos dados.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Exemplo 3: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em uma tabela de diretórios. No exemplo, um fluxo — my_directory_table_stream — está hospedado em uma tabela de diretórios em uma área chamada my_test_stage.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Para validar a tarefa acionada, os dados são adicionados ao estágio.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

A tabela de diretórios é então atualizada manualmente, o que aciona a tarefa.

ALTER STAGE my_test_stage REFRESH
Copy