Creating User-Defined Functions (UDFs) for DataFrames in Python¶
The Snowpark API provides methods that you can use to create a user-defined function from a lambda or function in Python. This topic explains how to create these types of functions.
In this Topic:
With Snowpark, you can create user-defined functions (UDFs) for your custom lambdas and functions, and you can call these UDFs to process the data in your DataFrame.
When you use the Snowpark API to create an UDF, the Snowpark library uploads the code for your function to an internal stage. When you call the UDF, the Snowpark library executes your function on the server, where the data is. As a result, the data doesn’t need to be transferred to the client in order for the function to process the data.
In your custom code, you can also import modules from Python files or third-party packages.
You can create a UDF for your custom code in one of two ways:
You can create an anonymous UDF and assign the function to a variable. As long as this variable is in scope, you can use this variable to call the UDF.
You can create a named UDF and call the UDF by name. You can use this if, for example, you need to call a UDF by name or use the UDF in a subsequent session.
The next sections explain how to create these UDFs.
Note that if you defined a UDF by running the
CREATE FUNCTION command, you can call that UDF in Snowpark. For details, see
Calling User-Defined Functions (UDFs).
Specifying Dependencies for a UDF¶
In order to define a UDF through the Snowpark API, you must call
Session.add_import() for any files that contain any
modules that your UDF depends on (e.g. Python files, zip files, resource files, etc.). You can also specify a directory and the Snowpark library will automatically compress it and upload it as a zip file.
(For details on reading resources from a UDF, see Reading Files from a UDF.)
The Snowpark library uploads these files to an internal stage and imports the files when executing your UDF.
The following example demonstrates how to add a zip file in a stage as a dependency:
>>> # Add a zip file that you uploaded to a stage. >>> session.add_import("@my_stage/<path>/my_library.zip")
The following examples demonstrate how to add a Python file from your local machine:
>>> # Import a Python file from your local machine. >>> session.add_import("/<path>/my_module.py") >>> # Import a Python file from your local machine and specify a relative Python import path. >>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
The following examples demonstrate how to add other types of dependencies:
>>> # Add a directory of resource files. >>> session.add_import("/<path>/my-resource-dir/") >>> # Add a resource file. >>> session.add_import("/<path>/my-resource.xml")
The Python Snowpark library will not be uploaded automatically.
You should not need to specify the following dependencies:
Your Python built-in libraries.
These libraries are already available in the runtime environment on the server where your UDFs are executed.
Using Third-Party Packages from Anaconda in a UDF¶
You can specify Anaconda packages to install when you create Python UDFs. When queries that call Python UDFs are executed inside a Snowflake warehouse, Anaconda packages are installed seamlessly and cached on the virtual warehouse on your behalf. For more information about best practices, how to view the available packages, and how to set up a local development environment, see Using Third-Party Packages.
session.add_packages to add packages at the session level.
This code example shows how to import packages and return their versions.
>>> import numpy as np >>> import pandas as pd >>> import xgboost as xgb >>> from snowflake.snowpark.functions import udf >>> session.add_packages("numpy", "pandas", "xgboost==1.5.0") >>> @udf ... def compute() -> list: ... return [np.__version__, pd.__version__, xgb.__version__]
You can also use
session.add_requirements to specify packages with a
You can add the UDF-level packages to overwrite the session-level packages you might have added previously.
>>> import numpy as np >>> import pandas as pd >>> import xgboost as xgb >>> from snowflake.snowpark.functions import udf >>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"]) ... def compute() -> list: ... return [np.__version__, pd.__version__, xgb.__version__]
If you don’t specify a package version, Snowflake will use the latest version when resolving dependencies. When deploying the UDF to production, however, you may want to ensure that your code always uses the same dependency versions. You can do that for both permanent and temporary UDFs.
When you create a permanent UDF, the UDF is created and registered only once. This resolves dependencies once and the selected version is used for production workloads. When the UDF executes, it will always use the same dependency versions.
When you create a temporary UDF, specify dependency versions as part of the version spec. That way, when the UDF is registered, package resolution will use the specified version. If you don’t specify the version, the dependency might be updated when a new version becomes available.
Creating an Anonymous UDF¶
To create an anonymous UDF, you can either:
udffunction in the
snowflake.snowpark.functionsmodule, passing in the definition of the anonymous function.
registermethod in the
UDFRegistrationclass, passing in the definition of the anonymous function.
Here is an example of an anonymous UDF:
>>> from snowflake.snowpark.types import IntegerType >>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
When writing code that might execute in multiple sessions, use the
register method to register
UDFs, rather than using the
udf function. This can prevent errors in which the default Snowflake
cannot be found.
Creating and Registering a Named UDF¶
If you want to call a UDF by name (e.g. by using the
call_udf function in the
functions module), you can create and register a named UDF. To do this, use one of the following:
registermethod, in the
UDFRegistrationclass, with the
udffunction, in the
snowflake.snowpark.functionsmodule, with the
To access an attribute or method of the
UDFRegistration class, call the
udf property of the
udf will create a temporary UDF that you can use in the current session.
To create a permanent UDF, call the
register method or the
udf function and set
is_permanent argument to
True. When you create a permanent UDF, you must also set the
argument to the stage location where the Python file for the UDF and its dependencies are uploaded.
Here is an example of how to register a named temporary UDF:
>>> from snowflake.snowpark.types import IntegerType >>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Here is an example of how to register a named permanent UDF by setting the
is_permanent argument to
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True) ... def minus_one(x: int) -> int: ... return x-1
Here is an example of these UDFs being called:
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> df.select(add_one("a"), minus_one("b")).collect() [Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)] >>> session.sql("select minus_one(1)").collect() [Row(MINUS_ONE(1)=0)]
Reading Files from a UDF¶
As mentioned earlier, the Snowpark library uploads and executes UDFs on the server. If your UDF needs to read data from a file, you must ensure that the file is uploaded with the UDF.
To set up a UDF to read a file:
Specify that the file is a dependency, which uploads the file to the server. For more information, see Specifying Dependencies for a UDF.
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
In the UDF, read the file.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
Using Vectorized UDFs via the Python UDF Batch API¶
The Python UDF batch API enables defining Python functions that receive batches of input rows
as Pandas DataFrames and
return batches of results as Pandas arrays
The column in the Snowpark
dataframe will be vectorized as a pandas Series inside the UDF.
You call vectorized Python UDFs that use the batch API the same way you call other Python UDFs.
For more information, see Python UDF Batch API and the examples in
the UDFs section of the Snowpark API Reference.