Creating User-Defined Functions (UDFs) for DataFrames in Python

The Snowpark API provides methods that you can use to create a user-defined function from a lambda or function in Python. This topic explains how to create these types of functions.

Introduction

With Snowpark, you can create user-defined functions (UDFs) for your custom lambdas and functions, and you can call these UDFs to process the data in your DataFrame.

When you use the Snowpark API to create a UDF, the Snowpark library uploads the code for your function to an internal stage. When you call the UDF, the Snowpark library executes your function on the server, where the data is. As a result, the data doesn’t need to be transferred to the client in order for the function to process the data.

In your custom code, you can also import modules from Python files or third-party packages.

You can create a UDF for your custom code in one of two ways:

  • You can create an anonymous UDF and assign the function to a variable. As long as this variable is in scope, you can use this variable to call the UDF.

  • You can create a named UDF and call the UDF by name. You can use this if, for example, you need to call a UDF by name or use the UDF in a subsequent session.

The next sections explain how to create these UDFs using a local development environment or using a Python worksheet.

Note that if you defined a UDF by running the CREATE FUNCTION command, you can call that UDF in Snowpark. For details, see Calling User-Defined Functions (UDFs).

Note

Vectorized Python UDFs let you define Python functions that receive batches of input rows as Pandas DataFrames. This results in much better performance with machine learning inference scenarios. For more information, see Using Vectorized UDFs.

Note

If you are working with a Python worksheet, use these examples within the handler function:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

If the examples return something other than a DataFrame, such as a list of Row objects, change the return type to match the return type of the example.

After you run a code example, use the Results tab to view any output returned. Refer to Running Python Worksheets for more details.

Specifying Dependencies for a UDF

To define a UDF using the Snowpark API, you must import the files that contain any modules that your UDF depends on, such as Python files, zip files, resource files, etc.

You can also specify a directory and the Snowpark library automatically compresses the directory and uploads it as a zip file. (For details on reading resources from a UDF, see Reading Files with a UDF.)

When you call Session.add_import(), the Snowpark library uploads the specified files to an internal stage and imports the files when executing your UDF.

The following example demonstrates how to add a zip file in a stage as a dependency to your code:

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

The following examples demonstrate how to add a Python file from your local machine:

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

The following examples demonstrate how to add other types of dependencies:

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

Note

The Python Snowpark library is not uploaded automatically.

You do not need to specify the following dependencies:

  • Your Python built-in libraries.

    These libraries are already available in the runtime environment on the server where your UDFs are executed.

Using Third-Party Packages from Anaconda in a UDF

You can use third-party packages from the Snowflake Anaconda channel in a UDF.

  • If you create a Python UDF in a Python worksheet, the Anaconda packages are already available to your worksheet. Refer to Add a Python File from a Stage to a Worksheet.

  • If you create a Python UDF in your local development environment, you can specify which Anaconda packages to install.

When queries that call Python UDFs are executed inside a Snowflake warehouse, Anaconda packages are installed seamlessly and cached on the virtual warehouse on your behalf.

For more information about best practices, how to view the available packages, and how to set up a local development environment, see Using Third-Party Packages.

If you write a Python UDF in your local development environment, use session.add_packages to add packages at the session level.

This code example shows how to import packages and return their versions.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

You can also use session.add_requirements to specify packages with a requirements file.

>>> session.add_requirements("mydir/requirements.txt")  
Copy

You can add the UDF-level packages to overwrite the session-level packages you might have added previously.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

Important

If you don’t specify a package version, Snowflake uses the latest version when resolving dependencies. When you deploy the UDF to production, you might want to ensure that your code always uses the same dependency versions. You can do that for both permanent and temporary UDFs.

  • When you create a permanent UDF, the UDF is created and registered only once. This resolves dependencies once and the selected version is used for production workloads. When the UDF executes, it always uses the same dependency versions.

  • When you create a temporary UDF, specify dependency versions as part of the version spec. That way, when the UDF is registered, package resolution uses the specified version. If you don’t specify the version, the dependency might be updated when a new version becomes available.

Creating an Anonymous UDF

To create an anonymous UDF, you can either:

  • Call the udf function in the snowflake.snowpark.functions module, passing in the definition of the anonymous function.

  • Call the register method in the UDFRegistration class, passing in the definition of the anonymous function.

Here is an example of an anonymous UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Note

When writing code that might execute in multiple sessions, use the register method to register UDFs, rather than using the udf function. This can prevent errors in which the default Snowflake Session object cannot be found.

Creating and Registering a Named UDF

If you want to call a UDF by name (e.g. by using the call_udf function in the functions module), you can create and register a named UDF. To do this, use one of the following:

  • The register method, in the UDFRegistration class, with the name argument.

  • The udf function, in the snowflake.snowpark.functions module, with the name argument.

To access an attribute or method of the UDFRegistration class, call the udf property of the Session class.

Calling register or udf will create a temporary UDF that you can use in the current session.

To create a permanent UDF, call the register method or the udf function and set the is_permanent argument to True. When you create a permanent UDF, you must also set the stage_location argument to the stage location where the Python file for the UDF and its dependencies are uploaded.

Here is an example of how to register a named temporary UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Here is an example of how to register a named permanent UDF by setting the is_permanent argument to True:

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

Here is an example of these UDFs being called:

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

You can also call the UDF using SQL:

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Creating a UDF from a Python source file

If you create your UDF in your local development environment, you can define your UDF handler in a Python file and then use the register_from_file method in the UDFRegistration class to create a UDF.

Note

You cannot use this method in a Python worksheet.

Here are examples of using register_from_file.

Suppose you have a Python file test_udf_file.py that contains:

def mod5(x: int) -> int:
    return x % 5
Copy

Then you can create a UDF from this function of file test_udf_file.py.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

You can also upload the file to a stage location, then use it to create the UDF.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Reading Files with a UDF

To read the contents of a file, your Python code can:

Reading Statically-Specified Files

The Snowpark library uploads and executes UDFs on the server. If your UDF needs to read data from a file, you must ensure that the file is uploaded with the UDF.

Note

If you write your UDF in a Python worksheet, the UDF can only read files from a stage.

To set up a UDF to read a file:

  1. Specify that the file is a dependency, which uploads the file to the server. For more information, see Specifying Dependencies for a UDF.

    For example:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. In the UDF, read the file. In the following example, the file will only be read once during UDF creation, and will not be read again during UDF execution. This is achieved with a third-party library cachetools.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

Reading Dynamically-Specified Files with SnowflakeFile

You can read a file from a stage using the SnowflakeFile class in the Snowpark snowflake.snowpark.files module. The SnowflakeFile class provides dynamic file access, which lets you stream files of any size. Dynamic file access is also useful when you want to iterate over multiple files. For example, see Processing Multiple Files.

For more information about and examples of reading files using SnowflakeFile, see Reading a File Using the SnowflakeFile Class in a Python UDF Handler.

The following example registers a temporary UDF that reads a text file from a stage using SnowflakeFile and returns the file length.

Register the UDF:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Call the UDF:

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Using Vectorized UDFs

Vectorized Python UDFs let you define Python functions that receive batches of input rows as Pandas DataFrames and return batches of results as Pandas arrays or Series. The column in the Snowpark dataframe will be vectorized as a Pandas Series inside the UDF.

Here is an example of how to use the batch interface:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

You call vectorized Python UDFs the same way you call other Python UDFs. For more information, see Vectorized Python UDFs, which explains how to create a vectorized UDF by using a SQL statement. For example, you can use the vectorized decorator when you specify the Python code in the SQL statement. By using the Snowpark Python API described in this document, you don’t use a SQL statement to create a vectorized UDF. So you don’t use the vectorized decorator.

It is possible to limit the number of rows per batch. For more information, see Setting a target batch size.

For more explanations and examples of using the Snowpark Python API to create vectorized UDFs, refer to the UDFs section of the Snowpark API Reference.