Tarefas acionadas

Use tarefas acionadas para executar tarefas sempre que houver uma alteração em um fluxo. Isso elimina a necessidade de pesquisar uma fonte com frequência quando a disponibilidade de novos dados é imprevisível. Elas também reduzem a latência porque os dados são processados imediatamente.

As tarefas acionadas não usam recursos de computação até que o evento seja acionado.

Considerações

  • Para fluxos hospedados em tabelas de diretório, a tabela de diretório precisa ser atualizada antes que uma tarefa acionada possa detectar as alterações. Para detectar alterações, você pode fazer uma das seguintes opções:

  • Não há suporte para fluxos em tabelas externas e tabelas híbridas.

Criar uma tarefa acionada

Use CREATE TASK e defina os seguintes parâmetros:

  • Defina o fluxo de destino usando a cláusula WHEN. (Não inclua o parâmetro SCHEDULE.)

    Ao trabalhar com vários fluxos de dados, você pode usar parâmetros condicionais: WHEN ... AND e WHEN ... OR.

  • Requisitos adicionais com base em recursos de computação:

    • Para criar uma tarefa sem servidor, você deve incluir o parâmetro TARGET_COMPLETION_INTERVAL. Não inclua o parâmetro WAREHOUSE. O Snowflake estima os recursos necessários usando o intervalo de conclusão do destino e se ajusta para concluir a tarefa nesse tempo.

    Diagrama mostrando como as tarefas acionadas sem servidor funcionam no Snowflake.
    • Para criar uma tarefa que seja executada em um warehouse gerenciado pelo usuário, inclua o parâmetro WAREHOUSE e defina o warehouse.

Migrar uma tarefa existente de uma tarefa agendada para uma tarefa acionada

  1. Suspenda a tarefa.

  2. Desmarque o parâmetro SCHEDULE e adicione a cláusula WHEN para definir o fluxo de destino.

  3. Retome a tarefa.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrar uma tarefa acionada gerenciada pelo usuário existente para uma tarefa acionada sem servidor

  1. Suspenda a tarefa.

  2. Remova o parâmetro WAREHOUSE e defina o parâmetro TARGET_COMPLETION_INTERVAL.

  3. Retome a tarefa.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Para obter mais informações, consulte tarefas sem servidor.

Permitir que a tarefa acionada seja executada

Quando você cria uma tarefa acionada, ela é iniciada no estado suspenso.

Para começar a monitorar o fluxo:

A tarefa é executada nas seguintes condições:

  • Quando você retoma uma tarefa acionada pela primeira vez, a tarefa verifica o fluxo em busca de alterações desde que a última tarefa foi executada. Se houver uma alteração, a tarefa é executada; caso contrário, ela é ignorada sem o uso de recursos de computação.

  • Se uma tarefa estiver em execução e o fluxo tiver novos dados, a tarefa aguardará até que a tarefa atual seja concluída. O Snowflake garante que apenas uma instância de uma tarefa seja executada por vez.

  • Após a conclusão de uma tarefa, o Snowflake verifica novamente se há alterações no fluxo. Se houver alterações, a tarefa será executada novamente; caso contrário, a tarefa será ignorada.

  • A tarefa é executada sempre que novos dados são detectados no fluxo.

  • Se os dados do fluxo estiverem hospedados em uma tabela de diretório, para detectar alterações, você pode fazer o seguinte

  • A cada 12 horas, a tarefa executa uma verificação de integridade para evitar que os fluxos se tornem obsoletos. Se não houver alterações, o Snowflake pulará a tarefa sem usar recursos de computação. Para fluxos, as instruções de tarefa devem consumir os dados no fluxo antes que a retenção de dados expire; caso contrário, o fluxo se torna obsoleto. Para obter mais informações, consulte Como evitar a desatualização de um fluxo.

  • Por padrão, as tarefas acionadas são executadas no máximo a cada 30 segundos. É possível modificar o parâmetro USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS para executá-las com mais frequência, até a cada 10 segundos.

  • Quando uma tarefa é acionada por Streams on Views, qualquer alteração nas tabelas referenciadas pela consulta Streams on Views (Fluxos em visualizações) também acionará a tarefa, independentemente de quaisquer uniões, agregações ou filtros na consulta.

Um diagrama mostra como as tarefas acionadas gerenciam novos dados à medida que eles chegam e também verificam as alterações a cada 12 horas.

Monitoramento de tarefas acionadas

  • Na saída SHOW TASKS e DESC TASK, a propriedade SCHEDULE exibe NULL para tarefas acionadas.

  • Na saída da exibição task_history dos esquemas information_schema e account_usage, a coluna SCHEDULED_FROM exibe TRIGGER.

Exemplos

Exemplo 1: crie uma tarefa sem servidor que seja executada sempre que os dados forem alterados em um fluxo.

Como a tarefa é sem servidor, o parâmetro TARGET_COMPLETION_INTERVAL é necessário para permitir que o Snowflake estime os recursos de computação necessários.

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

Exemplo 2: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em qualquer um dos dois fluxos.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Exemplo 3: crie uma tarefa gerenciada por usuário para ser executada sempre que forem detectadas alterações de dados em dois fluxos de dados diferentes. Como a tarefa usa o condicional AND, ela será ignorada se apenas um dos dois fluxos tiver novos dados.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Exemplo 4: crie uma tarefa gerenciada pelo usuário que seja executada sempre que os dados forem alterados em uma tabela de diretório. No exemplo, um fluxo (my_directory_table_stream) é hospedado em uma tabela de diretório em um estágio (my_test_stage).

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Para validar a tarefa acionada, os dados são adicionados ao estágio.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

A tabela de diretórios é então atualizada manualmente, o que aciona a tarefa.

ALTER STAGE my_test_stage REFRESH
Copy