snowflake.snowpark.AsyncJob¶
- class snowflake.snowpark.AsyncJob(query_id: str, query: str | None, session: Session, result_type: _AsyncResultType = _AsyncResultType.ROW, post_actions: List[Query] | None = None, log_on_exception: bool = False, case_sensitive: bool = True, **kwargs)[source]¶
Bases:
object
Provides a way to track an asynchronous query in Snowflake. A
DataFrame
object can be evaluated asynchronously and anAsyncJob
object will be returned. With this instance, you can:retrieve results;
check the query status (still running or done);
cancel the running query;
retrieve the query ID and perform other operations on this query ID manually.
AsyncJob
can be created bySession.create_async_job()
or action methods inDataFrame
and other classes. All methods inDataFrame
with a suffix of_nowait
execute asynchronously and create anAsyncJob
instance. They are also equivalent to corresponding functions inDataFrame
and other classes that setblock=False
. Therefore, to use it, you need to create a dataframe first. Here we demonstrate how to do that:- First, we create a dataframe:
>>> from snowflake.snowpark.functions import when_matched, when_not_matched >>> df = session.create_dataframe([[float(4), 3, 5], [2.0, -4, 7], [3.0, 5, 6],[4.0,6,8]], schema=["a", "b", "c"])
- Example 1
DataFrame.collect()
can be performed asynchronously:>>> async_job = df.collect_nowait() >>> async_job.result() [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]
You can also do:
>>> async_job = df.collect(block=False) >>> async_job.result() [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)]
- Example 2
DataFrame.to_pandas()
can be performed asynchronously:>>> async_job = df.to_pandas(block=False) >>> async_job.result() A B C 0 4.0 3 5 1 2.0 -4 7 2 3.0 5 6 3 4.0 6 8
- Example 3
DataFrame.first()
can be performed asynchronously:>>> async_job = df.first(block=False) >>> async_job.result() [Row(A=4.0, B=3, C=5)]
- Example 4
DataFrame.count()
can be performed asynchronously:>>> async_job = df.count(block=False) >>> async_job.result() 4
- Example 5
Save a dataframe to table or copy it into a stage file can also be performed asynchronously:
>>> table_name = "name" >>> async_job = df.write.save_as_table(table_name, block=False) >>> # copy into a stage file >>> remote_location = f"{session.get_session_stage()}/name.csv" >>> async_job = df.write.copy_into_location(remote_location, block=False) >>> async_job.result()[0]['rows_unloaded'] 4
- Example 7
Table.merge()
,Table.update()
,Table.delete()
can also be performed asynchronously:>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=["key", "value"]) >>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary") >>> target = session.table("my_table") >>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=["key", "value"]) >>> async_job = target.merge(source,target["key"] == source["key"],[when_matched().update({"value": source["value"]}),when_not_matched().insert({"key": source["key"]})],block=False) >>> async_job.result() MergeResult(rows_inserted=2, rows_updated=2, rows_deleted=0)
- Example 8
Cancel the running query associated with the dataframe:
>>> df = session.sql("select SYSTEM$WAIT(3)") >>> async_job = df.collect_nowait() >>> async_job.cancel()
- Example 9
Executing queries asynchronously is faster than executing queries one by one:
>>> from time import time >>> dfs = [session.sql("select SYSTEM$WAIT(1)") for _ in range(10)] >>> start = time() >>> res = [df.collect() for df in dfs] >>> time1 = time() - start >>> start = time() >>> async_jobs = [df.collect_nowait() for df in dfs] >>> res = [async_job.result() for async_job in async_jobs] >>> time2 = time() - start >>> time2 < time1 True
- Example 10
Creating an
AsyncJob
from an existing query ID, retrieving results and converting it back to aDataFrame
:>>> from snowflake.snowpark.functions import col >>> query_id = session.sql("select 1 as A, 2 as B, 3 as C").collect_nowait().query_id >>> async_job = session.create_async_job(query_id) >>> async_job.query 'select 1 as A, 2 as B, 3 as C' >>> async_job.result() [Row(A=1, B=2, C=3)] >>> async_job.result(result_type="pandas") A B C 0 1 2 3 >>> df = async_job.to_df() >>> df.select(col("A").as_("D"), "B").collect() [Row(D=1, B=2)]
Note
If a dataframe is associated with multiple queries:
if you use
Session.create_dataframe()
to create a dataframe from a large amount of local data and evaluate this dataframe asynchronously, data will still be loaded into Snowflake synchronously, and only fetching data from Snowflake again will be performed asynchronously.otherwise, multiple queries will be wrapped into a Snowflake Anonymous Block and executed asynchronously as one query.
Temporary objects (e.g., tables) might be created when evaluating dataframes and they will be dropped automatically after all queries finish when calling a synchronous API. When you evaluate dataframes asynchronously, temporary objects will only be dropped after calling
result()
.This feature is currently not supported in Snowflake Python stored procedures.
Methods
cancel
()Cancels the query associated with this instance.
is_done
()Checks the status of the query associated with this instance and returns a bool value indicating whether the query has finished.
result
([result_type])Blocks and waits until the query associated with this instance finishes, then returns query results.
to_df
()Returns a
DataFrame
built from the result of this asynchronous job.Attributes
The SQL text of of the executed query.
The query ID of the executed query