Creating Stored Procedures for DataFrames in Python¶
The Snowpark API provides methods that you can use to create a stored procedure in Python. This topic explains how to create stored procedures.
Introduction¶
With Snowpark, you can create stored procedures for your custom lambdas and functions, and you can call these stored procedures to process the data in your DataFrame.
You can create stored procedures that only exist within the current session (temporary stored procedures) as well as stored procedures that you can use in other sessions (permanent stored procedures).
Using Third-Party Packages from Anaconda in a Stored Procedure¶
You can specify Anaconda packages to install when you create Python stored procedures. When calling the Python stored procedure inside a Snowflake warehouse, Anaconda packages are installed seamlessly and cached on the virtual warehouse on your behalf. For more information about best practices, how to view the available packages, and how to set up a local development environment, see Using Third-Party Packages.
Use session.add_packages
to add packages at the session level.
This code example shows how to import packages and return their versions.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
You can also use session.add_requirements
to specify packages with a
requirements file.
>>> session.add_requirements("mydir/requirements.txt")
You can add the stored-procedure-level packages to overwrite the session-level packages you might have added previously.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Important
If you don’t specify a package version, Snowflake will use the latest version when resolving dependencies. When deploying the stored procedure to production, however, you may want to ensure that your code always uses the same dependency versions. You can do that for both permanent and temporary stored procedures.
When you create a permanent stored procedure, the stored procedure is created and registered only once. This resolves dependencies once and the selected version is used for production workloads. When the stored procedure executes, it will always use the same dependency versions.
When you create a temporary stored procedure, specify dependency versions as part of the version spec. That way, when the stored procedure is registered, package resolution will use the specified version. If you don’t specify the version, the dependency might be updated when a new version becomes available.
Creating an Anonymous Stored Procedure¶
To create an anonymous stored procedure, you can either:
Call the
sproc
function in thesnowflake.snowpark.functions
module, passing in the definition of the anonymous function.Call the
register
method in theStoredProcedureRegistration
class, passing in the definition of the anonymous function. To access an attribute or method of theStoredProcedureRegistration
class, call thesproc
property of theSession
class.
Here is an example of an anonymous stored procedure:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Note
When writing code that might execute in multiple sessions, use the register
method to register stored procedures,
rather than using the sproc
function. This can prevent errors in which the default Snowflake Session
object
cannot be found.
Creating and Registering a Named Stored Procedure¶
If you want to call a stored procedure by name (e.g. by using the call
function in the Session
object),
you can create and register a named stored procedure. To do this, you can either:
Call the
sproc
function in thesnowflake.snowpark.functions
module, passing in thename
argument and the definition of the anonymous function.Call the
register
method in theStoredProcedureRegistration
class, passing in thename
argument and the definition of the anonymous function. To access an attribute or method of theStoredProcedureRegistration
class, call thesproc
property of theSession
class.
Calling register
or sproc
will create a temporary stored procedure that you can use in the current session.
To create a permanent stored procedure, call the register
method or the sproc
function and set
the is_permanent
argument to True
. When you create a permanent stored procedure, you must also set the stage_location
argument to the stage location where the Python connector used by Snowpark uploads the Python file for the stored procedure and its dependencies.
Here is an example of how to register a named temporary stored procedure:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Here is an example of how to register a named permanent stored procedure by setting the is_permanent
argument to True
:
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
Here is an example of these stored procedures being called:
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Reading Files from a Stored Procedure¶
As mentioned earlier, the Snowpark library uploads and executes stored procedures on the server. If your stored procedure needs to read data from a file, you must ensure that the file is uploaded with the stored procedure.
To set up a stored procedure to read a file:
Specify that the file is a dependency, which uploads the file to the server. This is done the same way as for UDFs. For more information, see Specifying Dependencies for a UDF.
For example:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
In the stored procedure, read the file.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()