Travailler avec des tâches enfants asynchrones

Cette rubrique explique comment utiliser les tâches enfants asynchrones dans Snowflake Scripting.

Introduction aux tâches enfants asynchrones

Dans Snowflake Scripting, une tâche enfant asynchrone est une requête qui s’exécute en arrière-plan pendant que le code d’un bloc continue à s’exécuter. La requête peut être n’importe quelle instruction SQL valide, y compris des instructions SELECT et DML telles que INSERT ou UPDATE.

Pour exécuter une requête en tant que tâche enfant asynchrone, placez le mot-clé ASYNC avant la requête. Lorsque ce mot-clé est omis, le bloc Snowflake Scripting exécute les tâches enfants de manière séquentielle, et chaque tâche enfant attend que la tâche enfant en cours se termine avant de commencer. Les tâches enfants asynchrones peuvent être exécutées simultanément, ce qui permet d’améliorer l’efficacité et de réduire la durée d’exécution globale.

Vous pouvez utiliser le mot-clé ASYNC de la manière suivante :

  • Pour une requête exécutée sur RESULTSET.

  • Pour une requête exécutée indépendamment d’un RESULTSET.

Pour gérer les tâches enfants asynchrones, utilisez les instructions AWAIT et CANCEL :

  • L’instruction AWAIT attend la fin de toutes les tâches enfants asynchrones en cours d’exécution ou la fin d’une tâche enfant spécifique en cours d’exécution pour un RESULTSET, puis revient lorsque toute les tâches sont terminées ou que la tâche spécifique est terminée, respectivement.

  • L’instruction CANCEL annule une tâche enfant asynchrone en cours d’exécution pour un RESULTSET.

Vous pouvez vérifier le statut d’une tâche enfant asynchrone en cours d’exécution pour un RESULTSET en appelant la fonction SYSTEM$GET_RESULTSET_STATUS.

Actuellement, jusqu’à 4 000 tâches enfants asynchrones peuvent être exécutées simultanément. Une erreur est renvoyée si le nombre de tâches enfants asynchrones simultanées dépasse cette limite.

Note

Lorsque plusieurs tâches enfants asynchrones sont exécutées simultanément dans la même session, l’ordre d’achèvement des tâches n’est connu qu’à la fin de leur exécution. Comme l’ordre d’achèvement peut varier, l’utilisation de la fonction LAST_QUERY_ID avec des tâches enfants asynchrones n’est pas déterministe.

Exemples d’utilisation de tâches enfants asynchrones

Les sections suivantes présentent des exemples d’utilisation de tâches enfants asynchrones :

Exemple : exécution de tâches enfants qui effectuent des requêtes simultanées sur les tables

Le code suivant montre comment utiliser le mot-clé ASYNC pour exécuter plusieurs tâches enfants qui interrogent des tables de requête simultanément. L’exemple spécifie le mot-clé ASYNC pour les requêtes qui sont exécutées pour RESULTSETs.

Cet exemple utilise les données des tables suivantes :

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
Copy

La procédure stockée suivante effectue les actions suivantes :

  • Effectue une requête dans les deux tables pour obtenir les valeurs order_amount dans toutes les lignes et renvoie les résultats à différents RESULTSETs (un pour chaque table).

  • Spécifie que les requêtes s’exécutent en tant que tâches enfant simultanées en utilisant le mot-clé ASYNC.

  • Exécute l’instruction AWAIT pour chaque RESULTSET afin que la procédure attende la fin des requêtes avant de poursuivre. Les résultats de la requête pour un RESULTSET ne sont pas accessibles tant que AWAIT n’est pas exécuté pour le RESULTSET.

  • Utilise un curseur pour calculer la somme des lignes order_amount pour chaque table.

  • Additionne les totaux des tables et renvoie la valeur.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;
Copy

Appelez la procédure stockée :

CALL test_sp_async_child_jobs_query();
Copy
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
|                           4570 |
+--------------------------------+

Exemple : exécution simultanée de tâches enfant qui insèrent des lignes dans les tables

Le code suivant montre comment utiliser le mot-clé ASYNC pour exécuter simultanément plusieurs tâches enfants qui insèrent des lignes dans une table. L’exemple spécifie le mot-clé ASYNC pour les requêtes qui sont exécutées pour RESULTSETs.

La procédure stockée suivante effectue les actions suivantes :

  • Crée la table orders_q3_2024 si elle n’existe pas.

  • Crée deux RESULTSETs, insert_1 et insert_2, qui contiennent les résultats des insertions dans la table. Les arguments de la procédure stockée spécifient les valeurs qui sont insérées dans la table.

  • Spécifie que les insertions s’exécutent en tant que tâches enfant simultanées en utilisant le mot-clé ASYNC.

  • Exécute l’instruction AWAIT pour chaque RESULTSET afin que la procédure attende la fin des insertions avant de poursuivre. Les résultats d’un RESULTSET ne sont pas accessibles tant que AWAIT n’a pas été exécuté pour le RESULTSET.

  • Crée un nouveau site RESULTSET res qui contient les résultats d’une requête sur la table orders_q3_2024.

  • Renvoie les résultats de la requête.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;
Copy

Appelez la procédure stockée :

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
Copy
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

Exemple : Exécution de tâches enfants dans des procédures stockées avec des instructions AWAIT ALL

Les exemples suivants utilisent le mot-clé ASYNC pour exécuter simultanément plusieurs tâches enfants dans des procédures stockées. Les exemples spécifient le mot-clé ASYNC pour les instructions qui ne sont pas associées à un RESULTSET, puis utilisent l’instruction AWAIT ALL pour que le code de la procédure stockée attende la fin de toutes les tâches enfants asynchrones.

Créer une procédure stockée qui insère des valeurs simultanément

La procédure stockée suivante utilise le mot-clé ASYNC pour exécuter simultanément plusieurs tâches enfants qui insèrent des lignes dans une table. L’exemple spécifie le mot-clé ASYNC pour les instructions INSERT. L’exemple utilise également l’instruction AWAIT ALL pour que la procédure stockée attende la fin de toutes les tâches enfants asynchrones.

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;
Copy

Créer une procédure stockée qui met à jour des valeurs de manière simultanée

La procédure stockée suivante utilise le mot-clé ASYNC pour exécuter plusieurs tâches enfants qui mettent à jour simultanément les lignes d’une table. L’exemple spécifie le mot-clé ASYNC pour les instructions UPDATE. L’exemple utilise également l’instruction AWAIT ALL pour que la procédure stockée attende la fin de toutes les tâches enfants asynchrones.

Créer une table et insérer des données :

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);
Copy

Créez la procédure stockée :

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;
Copy

Créer une procédure stockée qui appelle d’autres procédures stockées simultanément

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;
Copy

Appelez la procédure stockée test_async_child_job_calls :

CALL test_async_child_job_calls();
Copy

Interrogez les tables pour voir les résultats :

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
Copy
+------+
| COL1 |
|------|
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
Copy
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

Exemple : Exécution de tâches enfants pour les insertions dans une boucle

Le code suivant montre comment utiliser le mot-clé ASYNC dans une boucle pour exécuter simultanément plusieurs tâches enfants qui insèrent des lignes dans une table.

Cet exemple utilise les données des tables suivantes :

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
Copy

Créez une procédure stockée qui insère les valeurs de async_loop_test1, concaténées avec le texte async_ dans async_loop_test2 en utilisant des tâches enfants asynchrones dans une boucle FOR. La boucle crée une tâche enfant asynchrone distincte à chaque itération. L’instruction AWAIT ALL bloque la progression de la procédure stockée jusqu’à ce que toutes les tâches enfants soient terminées.

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
Copy

Remarque : Si vous utilisez Snowflake CLI, SnowSQL, Classic Console, ou la méthode execute_stream ou execute_string dans le code Connecteur Python, utilisez cet exemple à la place (voir Utilisation de Snowflake Scripting dans Snowflake CLI, SnowSQL, le Classic Console et le connecteur Python) :

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;
Copy

Appelez la procédure stockée :

CALL async_insert();
Copy
+--------------+
| ASYNC_INSERT |
|--------------|
| Success      |
+--------------+

Interrogez la table async_loop_test2 pour voir les résultats :

SELECT * FROM async_loop_test2 ORDER BY col1;
Copy
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+