snowflake.snowpark.AsyncJob¶
- class snowflake.snowpark.AsyncJob(query_id: str, query: Optional[str], session: Session, result_type: _AsyncResultType = _AsyncResultType.ROW, post_actions: Optional[List[Query]] = None, log_on_exception: bool = False, case_sensitive: bool = True, num_statements: Optional[int] = None, **kwargs)[source]¶
- Bases: - object- Provides a way to track an asynchronous query in Snowflake. A - DataFrameobject can be evaluated asynchronously and an- AsyncJobobject will be returned. With this instance, you can:- retrieve results; 
- check the query status (still running or done); 
- cancel the running query; 
- retrieve the query ID and perform other operations on this query ID manually. 
 - AsyncJobcan be created by- Session.create_async_job()or action methods in- DataFrameand other classes. All methods in- DataFramewith a suffix of- _nowaitexecute asynchronously and create an- AsyncJobinstance. They are also equivalent to corresponding functions in- DataFrameand other classes that set- block=False. Therefore, to use it, you need to create a dataframe first. Here we demonstrate how to do that:- First, we create a dataframe:
- >>> from snowflake.snowpark.functions import when_matched, when_not_matched >>> from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType >>> df = session.create_dataframe([[float(4), 3, 5], [2.0, -4, 7], [3.0, 5, 6],[4.0,6,8]], schema=["a", "b", "c"]) 
- Example 1
- DataFrame.collect()can be performed asynchronously:- >>> async_job = df.collect_nowait() >>> async_job.result() [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)] - You can also do: - >>> async_job = df.collect(block=False) >>> async_job.result() [Row(A=4.0, B=3, C=5), Row(A=2.0, B=-4, C=7), Row(A=3.0, B=5, C=6), Row(A=4.0, B=6, C=8)] 
- Example 2
- DataFrame.to_pandas()can be performed asynchronously:- >>> async_job = df.to_pandas(block=False) >>> async_job.result() A B C 0 4.0 3 5 1 2.0 -4 7 2 3.0 5 6 3 4.0 6 8 
- Example 3
- DataFrame.first()can be performed asynchronously:- >>> async_job = df.first(block=False) >>> async_job.result() [Row(A=4.0, B=3, C=5)] 
- Example 4
- DataFrame.count()can be performed asynchronously:- >>> async_job = df.count(block=False) >>> async_job.result() 4 
- Example 5
- Save a dataframe to table or copy it into a stage file can also be performed asynchronously: - >>> table_name = "name" >>> async_job = df.write.save_as_table(table_name, block=False) >>> # copy into a stage file >>> remote_location = f"{session.get_session_stage()}/name.csv" >>> async_job = df.write.copy_into_location(remote_location, block=False) >>> async_job.result()[0]['rows_unloaded'] 4 
- Example 7
- Table.merge(),- Table.update(),- Table.delete()can also be performed asynchronously:- >>> schema = StructType([StructField("key", IntegerType()), StructField("value", StringType())]) >>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=schema) >>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary") >>> target = session.table("my_table") >>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=schema) >>> async_job = target.merge(source,target["key"] == source["key"],[when_matched().update({"value": source["value"]}),when_not_matched().insert({"key": source["key"]})],block=False) >>> async_job.result() MergeResult(rows_inserted=2, rows_updated=2, rows_deleted=0) 
- Example 8
- Cancel the running query associated with the dataframe: - >>> df = session.sql("select SYSTEM$WAIT(3)") >>> async_job = df.collect_nowait() >>> async_job.cancel() 
- Example 9
- Creating an - AsyncJobfrom an existing query ID, retrieving results and converting it back to a- DataFrame:- >>> from snowflake.snowpark.functions import col >>> query_id = session.sql("select 1 as A, 2 as B, 3 as C").collect_nowait().query_id >>> async_job = session.create_async_job(query_id) >>> async_job.query 'select 1 as A, 2 as B, 3 as C' >>> async_job.result() [Row(A=1, B=2, C=3)] >>> async_job.result(result_type="pandas") A B C 0 1 2 3 >>> df = async_job.to_df() >>> df.select(col("A").as_("D"), "B").collect() [Row(D=1, B=2)] 
- Example 10
- Checking the status of a failed query (division by zero) using the new status APIs: - >>> import time >>> failing_query = session.sql("select 1/0 as result") >>> async_job = failing_query.collect_nowait() >>> while not async_job.is_done(): ... time.sleep(1.0) >>> async_job.is_done() True >>> async_job.is_failed() True >>> async_job.status() 'FAILED_WITH_ERROR' 
 - Note - If a dataframe is associated with multiple queries: - if you use - Session.create_dataframe()to create a dataframe from a large amount of local data and evaluate this dataframe asynchronously, data will still be loaded into Snowflake synchronously, and only fetching data from Snowflake again will be performed asynchronously.
- otherwise, multiple queries will be wrapped into a Snowflake Anonymous Block and executed asynchronously as one query. 
 
- Temporary objects (e.g., tables) might be created when evaluating dataframes and they will be dropped automatically after all queries finish when calling a synchronous API. When you evaluate dataframes asynchronously, temporary objects will only be dropped after calling - result().
 - Methods - cancel()- Cancels the query associated with this instance. - is_done()- Checks the status of the query associated with this instance and returns a bool value indicating whether the query has finished. - Checks the status of the query associated with this instance and returns a bool value indicating whether the query has failed. - result([result_type])- Blocks and waits until the query associated with this instance finishes, then returns query results. - status()- Returns a string representing the current status of the query. - to_df()- Returns a - DataFramebuilt from the result of this asynchronous job.- Attributes - The SQL text of of the executed query. - The query ID of the executed query