비동기 하위 작업으로 작업하기

이 항목에서는 Snowflake Scripting에서 비동기 하위 작업을 사용하는 방법에 대해 설명합니다.

비동기 하위 작업 소개

Snowflake Scripting에서 비동기 하위 작업은 블록의 코드가 계속 실행되는 동안 백그라운드에서 실행되는 쿼리입니다. 쿼리는 SELECT 문과 DML 문을 포함한 모든 유효한 SQL 문(예: INSERT 또는 UPDATE)이 될 수 있습니다.

쿼리를 비동기 하위 작업으로 실행하려면 쿼리 앞에 ASYNC 키워드를 넣으십시오. 이 키워드를 생략하면 Snowflake Scripting 블록은 하위 작업을 순차적으로 실행하고, 각 하위 작업은 실행 중인 하위 작업이 완료될 때까지 기다렸다가 시작합니다. 비동기 하위 작업을 동시에 실행할 수 있으므로 효율성이 향상되고 전체 실행 시간이 단축됩니다.

ASYNC 키워드는 다음과 같은 방법으로 사용할 수 있습니다.

  • RESULTSET 에 대해 실행되는 쿼리의 경우.

  • RESULTSET 와 독립적으로 실행되는 쿼리의 경우.

비동기 하위 작업을 관리하려면 AWAITCANCEL 문을 사용하십시오.

  • AWAIT 문은 실행 중인 모든 비동기 하위 작업이 완료되거나 RESULTSET 에 대해 실행 중인 특정 하위 작업이 완료될 때까지 기다렸다가 모든 작업이 완료되거나 특정 작업이 완료되면 각각 반환합니다.

  • CANCEL 문은 RESULTSET 에 대해 실행 중인 비동기 하위 작업을 취소합니다.

SYSTEM$GET_RESULTSET_STATUS 함수를 호출하여 RESULTSET 에 대해 실행 중인 비동기 하위 작업의 상태를 확인할 수 있습니다.

현재 최대 4,000개의 비동기 하위 작업을 동시에 실행할 수 있습니다. 동시 비동기 하위 작업의 수가 이 제한을 초과하면 오류가 반환됩니다.

참고

동일한 세션에서 여러 개의 비동기 하위 작업이 동시에 실행되는 경우 작업 완료 순서는 작업 실행이 완료될 때까지 알 수 없습니다. 완료 순서가 달라질 수 있으므로 비동기 하위 작업과 함께 LAST_QUERY_ID 함수를 사용하는 것은 비결정적입니다.

비동기 하위 작업 사용 예시

다음 섹션에서는 비동기 하위 작업 사용의 예를 제시합니다.

예: 테이블을 동시에 쿼리하는 하위 작업 실행하기

다음 코드는 ASYNC 키워드를 사용하여 테이블을 동시에 쿼리하는 여러 하위 작업을 실행하는 방법을 보여줍니다. 이 예제에서는 RESULTSETs 에 대해 실행되는 쿼리에 대해 ASYNC 키워드를 지정합니다.

이 예에서는 다음 테이블의 데이터를 사용합니다.

CREATE OR REPLACE TABLE orders_q1_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q1_2024 VALUES (1, 500.00);
INSERT INTO orders_q1_2024 VALUES (2, 225.00);
INSERT INTO orders_q1_2024 VALUES (3, 725.00);
INSERT INTO orders_q1_2024 VALUES (4, 150.00);
INSERT INTO orders_q1_2024 VALUES (5, 900.00);

CREATE OR REPLACE TABLE orders_q2_2024 (
  order_id INT,
  order_amount NUMBER(12,2));

INSERT INTO orders_q2_2024 VALUES (1, 100.00);
INSERT INTO orders_q2_2024 VALUES (2, 645.00);
INSERT INTO orders_q2_2024 VALUES (3, 275.00);
INSERT INTO orders_q2_2024 VALUES (4, 800.00);
INSERT INTO orders_q2_2024 VALUES (5, 250.00);
Copy

다음 저장 프로시저는 다음 작업을 수행합니다.

  • 두 테이블에서 모든 행의 order_amount 값을 쿼리하고 그 결과를 서로 다른 RESULTSETs (각 테이블마다 하나씩)로 반환합니다.

  • ASYNC 키워드를 사용하여 쿼리가 동시 하위 작업으로 실행되도록 지정합니다.

  • 각 RESULTSET 에 대해 AWAIT 문을 실행하여 프로시저가 쿼리가 완료될 때까지 기다렸다가 계속 진행하도록 합니다. RESULTSET 에 대한 쿼리 결과는 AWAIT 가 RESULTSET 에 대해 실행될 때까지 액세스할 수 없습니다.

  • 커서를 사용하여 각 테이블에 대한 order_amount 행의 합계를 계산합니다.

  • 테이블의 합계를 더하고 값을 반환합니다.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
DECLARE
  accumulator1 INTEGER DEFAULT 0;
  accumulator2 INTEGER DEFAULT 0;
  res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
  res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
BEGIN
  AWAIT res1;
  LET cur1 CURSOR FOR res1;
  OPEN cur1;
  AWAIT res2;
  LET cur2 CURSOR FOR res2;
  OPEN cur2;
  FOR row_variable IN cur1 DO
      accumulator1 := accumulator1 + row_variable.order_amount;
  END FOR;
  FOR row_variable IN cur2 DO
      accumulator2 := accumulator2 + row_variable.order_amount;
  END FOR;
  RETURN accumulator1 + accumulator2;
END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_query()
RETURNS INTEGER
LANGUAGE SQL
AS
$$
  DECLARE
    accumulator1 INTEGER DEFAULT 0;
    accumulator2 INTEGER DEFAULT 0;
    res1 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q1_2024);
    res2 RESULTSET DEFAULT ASYNC (SELECT order_amount FROM orders_q2_2024);
  BEGIN
    AWAIT res1;
    LET cur1 CURSOR FOR res1;
    OPEN cur1;
    AWAIT res2;
    LET cur2 CURSOR FOR res2;
    OPEN cur2;
    FOR row_variable IN cur1 DO
        accumulator1 := accumulator1 + row_variable.order_amount;
    END FOR;
    FOR row_variable IN cur2 DO
        accumulator2 := accumulator2 + row_variable.order_amount;
    END FOR;
    RETURN accumulator1 + accumulator2;
  END;
$$;
Copy

저장 프로시저를 호출합니다.

CALL test_sp_async_child_jobs_query();
Copy
+--------------------------------+
| TEST_SP_ASYNC_CHILD_JOBS_QUERY |
|--------------------------------|
|                           4570 |
+--------------------------------+

예: 테이블에 행을 동시에 삽입하는 하위 작업 실행하기

다음 코드는 ASYNC 키워드를 사용하여 테이블에 행을 삽입하는 여러 하위 작업을 동시에 실행하는 방법을 보여줍니다. 이 예제에서는 RESULTSETs 에 대해 실행되는 쿼리에 대해 ASYNC 키워드를 지정합니다.

다음 저장 프로시저는 다음 작업을 수행합니다.

  • orders_q3_2024 테이블이 없는 경우 생성합니다.

  • 테이블에 삽입 결과를 저장하는 RESULTSETs, insert_1insert_2 를 2개 생성합니다. 저장 프로시저 인자는 테이블에 삽입되는 값을 지정합니다.

  • ASYNC 키워드를 사용하여 삽입이 동시 하위 작업으로 실행되도록 지정합니다.

  • 각 RESULTSET 에 대해 AWAIT 문을 실행하여 프로시저가 삽입이 완료될 때까지 기다렸다가 계속 진행하도록 합니다. RESULTSET 의 결과는 AWAIT 가 RESULTSET 에 대해 실행될 때까지 액세스할 수 없습니다.

  • orders_q3_2024 테이블에 쿼리 결과를 저장하는 RESULTSET res 를 새로 만듭니다.

  • 쿼리 결과를 반환합니다.

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE test_sp_async_child_jobs_insert(
  arg1 INT,
  arg2 NUMBER(12,2),
  arg3 INT,
  arg4 NUMBER(12,2))
RETURNS TABLE()
LANGUAGE SQL
AS
$$
  BEGIN
   CREATE TABLE IF NOT EXISTS orders_q3_2024 (
      order_id INT,
      order_amount NUMBER(12,2));
    LET insert_1 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg1, :arg2);
    LET insert_2 RESULTSET := ASYNC (INSERT INTO orders_q3_2024 SELECT :arg3, :arg4);
    AWAIT insert_1;
    AWAIT insert_2;
    LET res RESULTSET := (SELECT * FROM orders_q3_2024 ORDER BY order_id);
    RETURN TABLE(res);
  END;
$$;
Copy

저장 프로시저를 호출합니다.

CALL test_sp_async_child_jobs_insert(1, 325, 2, 241);
Copy
+----------+--------------+
| ORDER_ID | ORDER_AMOUNT |
|----------+--------------|
|        1 |       325.00 |
|        2 |       241.00 |
+----------+--------------+

예: AWAIT ALL 문으로 저장 프로시저에서 하위 작업 실행하기

다음 예에서는 ASYNC 키워드를 사용하여 저장 프로시저에서 여러 하위 작업을 동시에 실행합니다. 이 예에서는 RESULTSET 와 연결되지 않은 문에 ASYNC 키워드를 지정한 다음 AWAIT ALL 문을 사용하여 저장 프로시저 코드가 모든 비동기 하위 작업이 완료될 때까지 기다리도록 합니다.

값을 동시에 삽입하는 저장 프로시저 만들기

다음 저장 프로시저는 ASYNC 키워드를 사용하여 테이블에 행을 삽입하는 여러 하위 작업을 동시에 실행합니다. 이 예에서는 INSERT 문에 ASYNC 키워드를 지정합니다. 또한 이 예에서는 AWAIT ALL 문을 사용하여 저장 프로시저가 모든 비동기 하위 작업이 완료될 때까지 기다리도록 합니다.

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE test_async_child_job_inserts()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  CREATE OR REPLACE TABLE test_child_job_queries1 (col1 INT);
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(1));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(2));
  ASYNC (INSERT INTO test_child_job_queries1(col1) VALUES(3));
  AWAIT ALL;
END;
$$
;
Copy

값을 동시에 업데이트하는 저장 프로시저 만들기

다음 저장 프로시저는 ASYNC 키워드를 사용하여 테이블의 행을 동시에 업데이트하는 여러 하위 작업을 실행합니다. 이 예에서는 UPDATE 문에 ASYNC 키워드를 지정합니다. 또한 이 예에서는 AWAIT ALL 문을 사용하여 저장 프로시저가 모든 비동기 하위 작업이 완료될 때까지 기다리도록 합니다.

테이블을 만들고 데이터를 삽입합니다.

CREATE OR REPLACE TABLE test_child_job_queries2 (id INT, cola INT);

INSERT INTO test_child_job_queries2 VALUES
  (1, 100), (2, 101), (3, 102);
Copy

저장 프로시저를 만듭니다.

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE test_async_child_job_updates()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (UPDATE test_child_job_queries2 SET cola=200 WHERE id=1);
  ASYNC (UPDATE test_child_job_queries2 SET cola=201 WHERE id=2);
  ASYNC (UPDATE test_child_job_queries2 SET cola=202 WHERE id=3);
  AWAIT ALL;
END;
$$
;
Copy

다른 저장 프로시저를 동시에 호출하는 저장 프로시저 만들기

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE test_async_child_job_calls()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  ASYNC (CALL test_async_child_job_inserts());
  ASYNC (CALL test_async_child_job_updates());
  AWAIT ALL;
END;
$$
;
Copy

test_async_child_job_calls 저장 프로시저를 호출합니다.

CALL test_async_child_job_calls();
Copy

테이블을 쿼리하여 결과를 확인합니다.

SELECT col1 FROM test_child_job_queries1 ORDER BY col1;
Copy
+------+
| COL1 |
|------|
|    1 |
|    2 |
|    3 |
+------+
SELECT * FROM test_child_job_queries2 ORDER BY id;
Copy
+----+------+
| ID | COLA |
|----+------|
|  1 |  200 |
|  2 |  201 |
|  3 |  202 |
+----+------+

예: 루프에서 삽입을 위한 하위 작업 실행하기

다음 코드는 ASYNC 키워드를 루프에서 사용하여 테이블에 행을 삽입하는 여러 하위 작업을 동시에 실행하는 방법을 보여줍니다.

이 예에서는 다음 테이블의 데이터를 사용합니다.

CREATE OR REPLACE TABLE async_loop_test1(col1 VARCHAR, col2 INT);

INSERT INTO async_loop_test1 VALUES
  ('child', 0),
  ('job', 1),
  ('loop', 2),
  ('test', 3);

CREATE OR REPLACE TABLE async_loop_test2(col1 INT, col2 VARCHAR);
Copy

FOR 루프에서 비동기 하위 작업을 사용하여 텍스트 async_ 와 연결된 async_loop_test1 의 값을 async_loop_test2 에 삽입하는 저장 프로시저를 만듭니다. 루프는 각 반복마다 별도의 비동기 하위 작업을 생성합니다. AWAIT ALL 문은 모든 하위 작업이 완료될 때까지 저장 프로시저의 진행을 차단합니다.

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
Copy

참고: Python Connector 코드에서 Snowflake CLI, SnowSQL, Classic Console, 또는 execute_stream 또는 execute_string 메서드를 사용하는 경우 이 예제를 대신 사용하십시오(Snowflake CLI, SnowSQL, Classic Console 및 Python Connector에서 Snowflake Scripting 사용하기 참조).

CREATE OR REPLACE PROCEDURE async_insert()
RETURNS VARCHAR
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
begin
  LET res RESULTSET := (SELECT * FROM async_loop_test1 ORDER BY 1);

  FOR record IN res DO
    LET v VARCHAR := record.col1;
    LET x INT := record.col2;
      ASYNC (INSERT INTO async_loop_test2(col1, col2) VALUES (:x, (SELECT 'async_' || :v)));
    END FOR;

    AWAIT ALL;
    RETURN 'Success';
END;
$$;
Copy

저장 프로시저를 호출합니다.

CALL async_insert();
Copy
+--------------+
| ASYNC_INSERT |
|--------------|
| Success      |
+--------------+

async_loop_test2 테이블을 쿼리하여 결과를 확인합니다.

SELECT * FROM async_loop_test2 ORDER BY col1;
Copy
+------+-------------+
| COL1 | COL2        |
|------+-------------|
|    0 | async_child |
|    1 | async_job   |
|    2 | async_loop  |
|    3 | async_test  |
+------+-------------+