Python handler examples for stored procedures¶
Running concurrent tasks with worker processes¶
You can run concurrent tasks using Python worker processes. You might find this useful when you need to run parallel tasks that take advantage of multiple CPU cores on warehouse nodes.
Note
Snowflake recommends that you not use the built-in Python multiprocessing module.
To work around cases where the Python Global Interpreter Lock prevents a multi-tasking approach from scaling across all CPU cores, you can execute concurrent tasks using separate worker processes, rather than threads.
You can do this on Snowflake warehouses by using the joblib library’s Parallel class, as in the following example.
Note
The default backend used for joblib.Parallel differs between Snowflake standard and Snowpark-optimized warehouses.
Standard warehouse default:
threadingSnowpark-optimized warehouse default:
loky(multiprocessing)
You can override the default backend setting by calling the joblib.parallel_backend function, as in the following example.
Using Snowpark APIs for asynchrononous processing¶
The following examples illustrate how you can use Snowpark APIs to begin asynchronous child jobs, as well as how those jobs behave under different conditions.
Checking the status of an asynchronous child job¶
In the following example, the checkStatus procedure executes an asynchronous child job that waits 60 seconds. The procedure then
checks on the status of the job before it can have finished, so the check returns False.
The following code calls the procedure.
Cancelling an asynchronous child job¶
In the following example, the cancelJob procedure uses SQL to insert data into the test_tb table with an asynchronous
child job that would take 10 seconds to finish. It then cancels the child job before it finishes and the data has been inserted.
The following code queries the test_tb table, but returns no results because no data has been inserted.
Waiting and blocking while an asynchronous child job runs¶
In the following example, the blockUntilDone procedure executes an asynchronous child job that takes 5 seconds to finish. Using
the snowflake.snowpark.AsyncJob.result method, the procedure waits and returns when the job has finished.
The following code calls the blockUntilDone procedure, which returns after waiting 5 seconds.
Returning an error after requesting results from an unfinished asynchronous child job¶
In the following example, the earlyReturn procedure executes an asynchronous child job that takes 60 seconds to finish. The
procedure then attempts to return a DataFrame from the job’s result before it can have finished. The result is an error.
The following code calls the earlyReturn procedure, returning the error.
Finishing a parent job before a child job finishes, canceling the child job¶
In the following example, the earlyCancelJob procedure executes an asynchronous child job to insert data into a table and takes 10
seconds to finish. However, the parent job — async_handler — returns before the child job finishes, which cancels the child job.
The following code calls the earlyCancelJob procedure. It then queries the test_tb table, which returns no result because
no data was inserted by the canceled child job.
Reading files and assets¶
Reading a statically-specified file using IMPORTS¶
You can read a file by specifying the file name and stage name in the IMPORTS clause of the CREATE PROCEDURE command.
When you specify a file in the IMPORTS clause, Snowflake copies that file from the stage to the stored procedure’s home directory (also called the import directory), which is the directory from which the stored procedure reads the file.
Snowflake copies imported files to a single directory. All files in that directory must have unique names, so each file in your IMPORTS clause must have a distinct name. This applies even if the files start out in different stages or different subdirectories within a stage.
The following example uses an in-line Python handler that reads a file called file.txt from a stage named my_stage.
The handler retrieves the location of the stored procedure’s home directory using
the Python sys._xoptions method with the snowflake_import_directory system option.
Snowflake reads the file only once during stored procedure creation, and will not read it again during stored procedure execution if reading the file happens outside of the target handler.
Create the stored procedure with an in-line handler:
Importing a directory using IMPORTS¶
You can import a directory using the IMPORTS clause of the CREATE PROCEDURE command.
Note
The import path for a directory must end with a trailing slash (
/). For example,IMPORTS = ('@my_stage/my_dir/').To rename a directory on import, append
/=custom_name/to the stage path. The custom name must be a single directory name, not a path. For example,IMPORTS = ('@my_stage/my_dir/=custom_name/').Directory imports are not supported in Native Apps.
The following example imports a directory called my_dir from a stage named my_stage and lists the files it contains.
