非同期子ジョブの操作

このトピックでは、Snowflake Scriptingで非同期子ジョブを使用する方法について説明します。

非同期子ジョブの紹介

Snowflake Scriptingでは、非同期子ジョブは、ブロック内のコードが実行され続けている間にバックグラウンドで実行されるクエリです。クエリには、 SELECT ステートメントや DML ステートメント(INSERT や UPDATE など)を含む、有効な SQL ステートメントを指定できます。

クエリを非同期子ジョブとして実行するには、 ASYNC キーワードをクエリの前に置きます。このキーワードが省略された場合、Snowflake Scriptingブロックは子ジョブを順次実行し、各子ジョブは実行中の子ジョブの終了を待ってから開始します。非同期子ジョブは同時に実行できるため、効率が向上し、全体の実行時間を短縮できます。

ASYNC キーワードは以下のように使用できます。

  • RESULTSET に対して実行されるクエリの場合。

  • RESULTSET とは無関係に実行されるクエリの場合。

非同期子ジョブを管理するには、 AWAIT および CANCEL ステートメントを使用します。

  • AWAIT ステートメントは、実行中のすべての非同期子ジョブが終了するまで待機するか、 RESULTSET のために実行中の特定の子ジョブが終了するまで待機し、すべてのジョブが終了するか特定のジョブが終了するとそれぞれ戻ります。

  • CANCEL ステートメントは、 RESULTSET に対して実行中の非同期子ジョブをキャンセルします。

SYSTEM$GET_RESULTSET_STATUS 関数を呼び出すことで、 RESULTSET で実行中の非同期子ジョブのステータスを確認できます。

現在、最大4,000の非同期子ジョブを同時に実行できます。同時に実行される非同期子ジョブの数がこの制限を超えると、エラーが返されます。

注釈

複数の非同期子ジョブが同じセッションで同時に実行される場合、ジョブの完了順序はジョブの実行が終了するまでわかりません。完了の順序が変わる可能性があるため、非同期子ジョブで LAST_QUERY_ID 関数を使用することは非決定的です。

非同期子ジョブの使用例

以下のセクションでは、非同期子ジョブの使用例を示します。

例:テーブルを同時にクエリする子ジョブの実行

以下のコードは、 ASYNC キーワードを使用して、テーブルをクエリする複数の子ジョブを同時に実行する方法を示しています。この例では、 RESULTSETs に対して実行されるクエリに対して ASYNC キーワードを指定しています。

この例では、以下の表のデータを使用します。

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
Copy

以下のストアドプロシージャは、以下のアクションを実行します。

  • すべての行の order_amount 値を両方のテーブルにクエリし、その結果を異なる RESULTSETs (各テーブルに 1 つずつ) に返します。

  • ASYNC キーワードを使用して、クエリが並行子ジョブとして実行されるように指定します。

  • RESULTSET ごとに AWAIT ステートメントを実行し、プロシージャがクエリの終了を待ってから処理を進めるようにします。AWAIT が RESULTSET に対して実行されるまで、 RESULTSET のクエリ結果にアクセスすることはできません。

  • カーソルを使用して、各テーブルの order_amount 行の合計を計算します。

  • テーブルの合計を加算し、値を返します。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;
Copy

ストアドプロシージャを呼び出します。

CALL test_sp_async_child_jobs_query();
Copy
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
|                           4570 |
+--------------------------------+

例:テーブルに行を挿入する子ジョブの同時実行

以下のコードは、 ASYNC キーワードを使用して、テーブルに行を挿入する複数の子ジョブを同時に実行する方法を示しています。この例では、 RESULTSETs に対して実行されるクエリに対して ASYNC キーワードを指定しています。

以下のストアドプロシージャは、以下のアクションを実行します。

  • orders_q3_2024 テーブルが存在しない場合は作成します。

  • テーブルへの挿入結果を保持する2つの RESULTSETs、 insert_1insert_2 を作成します。ストアドプロシージャの引数は、テーブルに挿入される値を指定します。

  • ASYNC キーワードを使用して、挿入が同時実行子ジョブとして実行されるように指定します。

  • RESULTSET ごとに AWAIT ステートメントを実行し、プロシージャが挿入の終了を待ってから処理を進めるようにします。AWAIT が RESULTSET に対して実行されるまで、 RESULTSET の結果にはアクセスできません。

  • orders_q3_2024 テーブルのクエリ結果を保持する新しい RESULTSET res を作成します。

  • クエリの結果を返します。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;
Copy

ストアドプロシージャを呼び出します。

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
Copy
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

例: AWAIT ALL ステートメントを使用したストアドプロシージャ内の子ジョブの実行

以下の例では、 ASYNC キーワードを使用して、ストアドプロシージャ内で複数の子ジョブを同時実行します。例では、 RESULTSET に関連付けられていないステートメントに ASYNC キーワードを指定し、ストアドプロシージャコードがすべての非同期子ジョブの完了を待つように AWAIT ALL ステートメントを使用しています。

同時に値を挿入するストアドプロシージャの作成

以下のストアドプロシージャは、 ASYNC キーワードを使用して、テーブルに行を挿入する複数の子ジョブを同時に実行します。この例では、 INSERT ステートメントに ASYNC キーワードを指定しています。この例では、ストアドプロシージャがすべての非同期子ジョブの完了を待つように、 AWAIT ALL ステートメントも使用しています。

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;
Copy

同時に値を更新するストアドプロシージャの作成

以下のストアドプロシージャは、 ASYNC キーワードを使用して、テーブルの行を更新する複数の子ジョブを同時に実行します。この例では、 UPDATE ステートメントに ASYNC キーワードを指定しています。この例では、ストアドプロシージャがすべての非同期子ジョブの完了を待つように、 AWAIT ALL ステートメントも使用しています。

テーブルを作成してデータを挿入します。

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);
Copy

ストアドプロシージャを作成します。

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;
Copy

他のストアドプロシージャを同時に呼び出すストアドプロシージャの作成

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;
Copy

ストアドプロシージャ test_async_child_job_calls を呼び出します。

CALL test_async_child_job_calls();
Copy

テーブルをクエリして結果を確認します。

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
Copy
+------+
| COL1 |
|------|
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
Copy
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

例: ループ内での挿入に対する子ジョブの実行

以下のコードは、 ASYNC キーワードをループ内で使用して、テーブルに行を挿入する複数の子ジョブを同時に実行する方法を示しています。

この例では、以下の表のデータを使用します。

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
Copy

FOR ループの非同期子ジョブを使用して、 async_ のテキストを連結した async_loop_test1 の値を async_loop_test2 に挿入するストアドプロシージャを作成します。ループは各反復で別々の非同期子ジョブを作成します。AWAIT ALL ステートメントは、すべての子ジョブが完了するまで、ストアドプロシージャの進行をブロックします。

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
Copy

注意: Snowflake CLISnowSQLClassic Consoleexecute_streamexecute_string メソッドを Python Connector コードで使用する場合は、代わりにこの例を使用してください(Snowflake CLI、 SnowSQL、 Classic Console、Python ConnectorでSnowflake Scriptingを使用する を参照)。

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;
Copy

ストアドプロシージャを呼び出します。

CALL async_insert();
Copy
+--------------+
| ASYNC_INSERT |
|--------------|
| Success      |
+--------------+

async_loop_test2 テーブルをクエリして結果を確認します。

SELECT * FROM async_loop_test2 ORDER BY col1;
Copy
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+