Trabalhar com trabalhos secundários assíncronos

Este tópico explica como usar trabalhos secundários/filhos assíncronos no Snowflake Scripting.

Introdução aos trabalhos secundários assíncronos

No Snowflake Scripting, um trabalho secundário assíncrono é uma consulta que é executada em segundo plano enquanto o código em um bloco continua a ser executado. A consulta pode ser qualquer instrução SQL válida, incluindo instruções SELECT e instruções DML, como INSERT ou UPDATE.

Para executar uma consulta como um trabalho secundário assíncrono, coloque a palavra-chave ASYNC antes da consulta. Quando essa palavra-chave é omitida, o bloco Snowflake Scripting executa os trabalhos secundários sequencialmente, e cada trabalho secundário espera que o trabalho secundário em execução termine antes de começar. Os trabalhos secundários assíncronos podem ser executados simultaneamente, o que pode aumentar a eficiência e reduzir o tempo total de execução.

Você pode usar a palavra-chave ASYNC das seguintes maneiras:

  • Para uma consulta que é executada para RESULTSET.

  • Para uma consulta que é executada independentemente de um RESULTSET.

Para gerenciar trabalhos secundários assíncronos, use as instruções AWAIT e CANCEL:

  • A instrução AWAIT aguarda a conclusão de todos os trabalhos secundários assíncronos que estão em execução ou a conclusão de um trabalho secundário específico que está em execução para um RESULTSET e, em seguida, retorna quando todos os trabalhos tiverem sido concluídos ou o trabalho específico tiver sido concluído, respectivamente.

  • A instrução CANCEL cancela um trabalho secundário assíncrono que está sendo executado para um RESULTSET.

Você pode verificar o status de um trabalho secundário assíncrono que está sendo executado para um RESULTSET chamando a função SYSTEM$GET_RESULTSET_STATUS.

Atualmente, até 4.000 trabalhos secundários assíncronos podem ser executados simultaneamente. Um erro será retornado se o número de trabalhos secundários assíncronos simultâneos exceder esse limite.

Nota

Quando vários trabalhos secundários assíncronos são executados simultaneamente na mesma sessão, a ordem de conclusão do trabalho não é conhecida até que os trabalhos tenham terminado de ser executados. Como a ordem de conclusão pode variar, o uso da função LAST_QUERY_ID com trabalhos secundários assíncronos não é determinístico.

Exemplos de uso de trabalhos secundários assíncronos

As seções a seguir fornecem exemplos de uso de trabalhos secundários assíncronos:

Exemplo: como executar trabalhos filhos que consultam tabelas ao mesmo tempo

O código a seguir mostra como usar a palavra-chave ASYNC para executar vários trabalhos filhos que consultam tabelas simultaneamente. O exemplo especifica a palavra-chave ASYNC para consultas que são executadas para RESULTSETs.

Este exemplo usa os dados das tabelas a seguir:

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
Copy

O procedimento armazenado a seguir executa as seguintes ações:

  • Consulta ambas as tabelas para os valores order_amount em todas as linhas e retorna os resultados para diferentes RESULTSETs (um para cada tabela).

  • Especifica que as consultas são executadas como trabalhos filhos simultâneos usando a palavra-chave ASYNC.

  • Executa a instrução AWAIT para cada RESULTSET para que o procedimento aguarde a conclusão das consultas antes de prosseguir. Os resultados de consulta para um RESULTSET não podem ser acessados até que o AWAIT seja executado para o RESULTSET.

  • Usa um cursor para calcular a soma das linhas order_amount de cada tabela.

  • Adiciona os totais das tabelas e retorna o valor.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;
Copy

Chame o procedimento armazenado:

CALL test_sp_async_child_jobs_query();
Copy
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
|                           4570 |
+--------------------------------+

Exemplo: como executar trabalhos filhos que inserem linhas nas tabelas ao mesmo tempo

O código a seguir mostra como usar a palavra-chave ASYNC para executar vários trabalhos filhos que inserem linhas em uma tabela simultaneamente. O exemplo especifica a palavra-chave ASYNC para consultas que são executadas para RESULTSETs.

O procedimento armazenado a seguir executa as seguintes ações:

  • Cria a tabela orders_q3_2024 se ela não existir.

  • Cria dois RESULTSETs, insert_1 e insert_2, que contêm os resultados das inserções na tabela. Os argumentos do procedimento armazenado especificam os valores que são inseridos na tabela.

  • Especifica que as inserções são executadas como trabalhos filhos simultâneos usando a palavra-chave ASYNC.

  • Executa a instrução AWAIT para cada RESULTSET para que o procedimento aguarde a conclusão das inserções antes de prosseguir. Os resultados de um RESULTSET não podem ser acessados até que o AWAIT seja executado para o RESULTSET.

  • Cria um novo RESULTSET res que contém os resultados de uma consulta na tabela orders_q3_2024.

  • Retorna os resultados da consulta.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;
Copy

Chame o procedimento armazenado:

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
Copy
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

Exemplo: execução de trabalhos secundários em procedimentos armazenados com instruções AWAIT ALL

Os exemplos a seguir usam a palavra-chave ASYNC para executar vários trabalhos secundários simultaneamente em procedimentos armazenados. Os exemplos especificam a palavra-chave ASYNC para instruções que não estão associadas a um RESULTSET e, em seguida, usam a declaração AWAIT ALL para que o código do procedimento armazenado aguarde a conclusão de todos os trabalhos secundários assíncronos.

Criar um procedimento armazenado que insira valores simultaneamente

O procedimento armazenado a seguir usa a palavra-chave ASYNC para executar vários trabalhos secundários que inserem linhas em uma tabela simultaneamente. O exemplo especifica a palavra-chave ASYNC para as instruções INSERT. O exemplo também usa a instrução AWAIT ALL para que o procedimento armazenado aguarde a conclusão de todos os trabalhos secundários assíncronos.

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;
Copy

Criar um procedimento armazenado que atualize valores simultaneamente

O procedimento armazenado a seguir usa a palavra-chave ASYNC para executar vários trabalhos secundários que atualizam linhas em uma tabela simultaneamente. O exemplo especifica a palavra-chave ASYNC para as instruções UPDATE. O exemplo também usa a instrução AWAIT ALL para que o procedimento armazenado aguarde a conclusão de todos os trabalhos secundários assíncronos.

Criar uma tabela e inserir dados:

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);
Copy

Crie o procedimento armazenado:

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;
Copy

Criar um procedimento armazenado que chame outros procedimentos armazenados simultaneamente

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;
Copy

Chame o procedimento armazenado test_async_child_job_calls:

CALL test_async_child_job_calls();
Copy

Consulte as tabelas para ver os resultados:

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
Copy
+------+
| COL1 |
|------|
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
Copy
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

Exemplo: execução de trabalhos secundários para inserções em um loop

O código a seguir mostra como usar a palavra-chave ASYNC em um loop para executar vários trabalhos secundários que inserem linhas em uma tabela simultaneamente.

Este exemplo usa os dados das tabelas a seguir:

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
Copy

Crie um procedimento armazenado que insira valores de async_loop_test1, concatenados com o texto async_ em async_loop_test2 usando trabalhos secundários assíncronos em um loop FOR. O loop cria um trabalho secundário assíncrono separado em cada iteração. A instrução AWAIT ALL bloqueia o progresso no procedimento armazenado até que todos os trabalhos secundários sejam concluídos.

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
Copy

Observação: se você usar o Snowflake CLI, SnowSQL, o Classic Console, ou o método execute_stream ou execute_string no código Python Connector, use este exemplo (consulte Usar o Snowflake Scripting no Snowflake CLI, SnowSQL, Classic Console e Python Connector):

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;
Copy

Chame o procedimento armazenado:

CALL async_insert();
Copy
+--------------+
| ASYNC_INSERT |
|--------------|
| Success      |
+--------------+

Consulte a tabela async_loop_test2 para ver os resultados:

SELECT * FROM async_loop_test2 ORDER BY col1;
Copy
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+