You are viewing documentation about an older version (1.4.0). View latest version

snowflake.snowpark.AsyncJob

class snowflake.snowpark.AsyncJob(query_id: str, query: str | None, session: Session, result_type: _AsyncResultType = _AsyncResultType.ROW, post_actions: List[Query] | None = None, log_on_exception: bool = False, case_sensitive: bool = True, **kwargs)[source]

Bases: object

Provides a way to track an asynchronous query in Snowflake. A DataFrame object can be evaluated asynchronously and an AsyncJob object will be returned. With this instance, you can:

  • retrieve results;

  • check the query status (still running or done);

  • cancel the running query;

  • retrieve the query ID and perform other operations on this query ID manually.

AsyncJob can be created by Session.create_async_job() or action methods in DataFrame and other classes. All methods in DataFrame with a suffix of _nowait execute asynchronously and create an AsyncJob instance. They are also equivalent to corresponding functions in DataFrame and other classes that set block=False. Therefore, to use it, you need to create a dataframe first. Here we demonstrate how to do that:

First, we create a dataframe:
>>> from snowflake.snowpark.functions import when_matched, when_not_matched
>>> df = session.create_dataframe([[float(4), 3, 5], [2.0, -4, 7], [3.0, 5, 6],[4.0,6,8]], schema=["a", "b", "c"])
Copy
Example 1

DataFrame.collect() can be performed asynchronously:

>>> async_job = df.collect_nowait()
>>> async_job.result()
[Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]
Copy

You can also do:

>>> async_job = df.collect(block=False)
>>> async_job.result()
[Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]
Copy
Example 2

DataFrame.to_pandas() can be performed asynchronously:

>>> async_job = df.to_pandas(block=False)
>>> async_job.result()
     A  B  C
0  4.0  3  5
1  2.0 -4  7
2  3.0  5  6
3  4.0  6  8
Copy
Example 3

DataFrame.first() can be performed asynchronously:

>>> async_job = df.first(block=False)
>>> async_job.result()
[Row(A=4.0, B=3, C=5)]
Copy
Example 4

DataFrame.count() can be performed asynchronously:

>>> async_job = df.count(block=False)
>>> async_job.result()
4
Copy
Example 5

Save a dataframe to table or copy it into a stage file can also be performed asynchronously:

>>> table_name = "name"
>>> async_job = df.write.save_as_table(table_name, block=False)
>>> # copy into a stage file
>>> remote_location = f"{session.get_session_stage()}/name.csv"
>>> async_job = df.write.copy_into_location(remote_location, block=False)
>>> async_job.result()[0]['rows_unloaded']
4
Copy
Example 7

Table.merge(), Table.update(), Table.delete() can also be performed asynchronously:

>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"])
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=["key", "value"])
>>> async_job = target.merge(source,target["key"] == source["key"],[when_matched().update({"value": source["value"]}),when_not_matched().insert({"key": source["key"]})],block=False)
>>> async_job.result()
MergeResult(rows_inserted=2, rows_updated=2, rows_deleted=0)
Copy
Example 8

Cancel the running query associated with the dataframe:

>>> df = session.sql("select SYSTEM$WAIT(3)")
>>> async_job = df.collect_nowait()
>>> async_job.cancel()
Copy
Example 9

Executing queries asynchronously is faster than executing queries one by one:

>>> from time import time
>>> dfs = [session.sql("select SYSTEM$WAIT(1)") for _ in range(10)]
>>> start = time()
>>> res = [df.collect() for df in dfs]
>>> time1 = time() - start
>>> start = time()
>>> async_jobs = [df.collect_nowait() for df in dfs]
>>> res = [async_job.result() for async_job in async_jobs]
>>> time2 = time() - start
>>> time2 < time1
True
Copy
Example 10

Creating an AsyncJob from an existing query ID, retrieving results and converting it back to a DataFrame:

>>> from snowflake.snowpark.functions import col
>>> query_id = session.sql("select 1 as A, 2 as B, 3 as C").collect_nowait().query_id
>>> async_job = session.create_async_job(query_id)
>>> async_job.query
'select 1 as A, 2 as B, 3 as C'
>>> async_job.result()
[Row(A=1, B=2, C=3)]
>>> async_job.result(result_type="pandas")
   A  B  C
0  1  2  3
>>> df = async_job.to_df()
>>> df.select(col("A").as_("D"), "B").collect()
[Row(D=1, B=2)]
Copy

Note

  • If a dataframe is associated with multiple queries:

    • if you use Session.create_dataframe() to create a dataframe from a large amount of local data and evaluate this dataframe asynchronously, data will still be loaded into Snowflake synchronously, and only fetching data from Snowflake again will be performed asynchronously.

    • otherwise, multiple queries will be wrapped into a Snowflake Anonymous Block and executed asynchronously as one query.

  • Temporary objects (e.g., tables) might be created when evaluating dataframes and they will be dropped automatically after all queries finish when calling a synchronous API. When you evaluate dataframes asynchronously, temporary objects will only be dropped after calling result().

  • This feature is currently not supported in Snowflake Python stored procedures.

Methods

cancel()

Cancels the query associated with this instance.

is_done()

Checks the status of the query associated with this instance and returns a bool value indicating whether the query has finished.

result([result_type])

Blocks and waits until the query associated with this instance finishes, then returns query results.

to_df()

Returns a DataFrame built from the result of this asynchronous job.

Attributes

query

The SQL text of of the executed query.

query_id

The query ID of the executed query