You are viewing documentation about an older version (1.12.1). View latest version

snowflake.snowpark.udf.UDFRegistration

class snowflake.snowpark.udf.UDFRegistration(session: Session)[source]

Bases: object

Provides methods to register lambdas and functions as UDFs in the Snowflake database. For more information about Snowflake Python UDFs, see Python UDFs.

session.udf returns an object of this class. You can use this object to register UDFs that you plan to use in the current session or permanently. The methods that register a UDF return a UserDefinedFunction object, which you can also use in Column expressions.

There are two ways to register a UDF with Snowpark:

  • Use udf() or register(). By pointing to a runtime Python function, Snowpark uses cloudpickle to serialize this function to bytecode, and deserialize the bytecode to a Python function on the Snowflake server during UDF creation. During the serialization, the global variables used in the Python function will be serialized into the bytecode, but only the name of the module object or any objects from a module that are used in the Python function will be serialized. If the size of the serialized bytecode is over 8K bytes, it will be uploaded to a stage location as a Python file. If it’s under 8K, it will be added to the UDF in-line code. During the deserialization, Python will look up the corresponding modules and objects by names. For example:

    >>> import numpy
    >>> from resources.test_udf_dir.test_udf_file import mod5
    >>> a = 1
    >>> def f():
    ...     return 2
    >>>
    >>> from snowflake.snowpark.functions import udf
    >>> session.add_import("tests/resources/test_udf_dir/test_udf_file.py", import_path="resources.test_udf_dir.test_udf_file")
    >>> session.add_packages("numpy")
    >>> @udf
    ... def g(x: int) -> int:
    ...     return mod5(numpy.square(x)) + a + f()
    >>> df = session.create_dataframe([4], schema=["a"])
    >>> df.select(g("a")).to_df("col1").show()
    ----------
    |"COL1"  |
    ----------
    |4       |
    ----------
    
    Copy

    Here the variable a and function f will be serialized into the bytecode, but only the name of numpy and mod5 will be included in the bytecode. Therefore, in order to have these modules on the server side, you can use add_import() and add_packages() to add your first-party and third-party libraries.

    After deserialization, this function will be executed and applied to every row of your dataframe or table during UDF execution. This approach is very flexible because you can either create a UDF from a function in your current file/notebook, and you can also import the function from elsewhere. However, the limitations of this approach are:

    • All code inside the function will be executed on every row, so you are not able to perform some initializations before executing this function. For example, if you want to read a file from a stage in a UDF, this file will be read on every row. However, we still have a workaround for this scenario, which can be found in Example 8 here.

    • If the runtime function references some very large global variables (e.g., a machine learning model with a large number of parameters), they will also be serialized and the size of bytecode can be very large, which will take more time for uploading files. Also, the UDF creation will fail if the referenced global variables cannot be pickled (e.g., weakref object). In this case, you usually have to save such objects in the local environment first, add it to the UDF using add_import(), and read it from the UDF (see Example 8 here).

  • Use register_from_file(). By pointing to a Python file or a zip file containing Python source code and the target function name, Snowpark uploads this file to a stage (which can also be customized), and load the corresponding function from this file to the Python runtime on the Snowflake server during UDF creation. Then this function will be executed and applied to every row of your dataframe or table when executing this UDF. This approach can address the deficiency of the previous approach that uses cloudpickle, because the source code in this file other than the target function will be loaded during UDF creation, and will not be executed on every row during UDF execution. Therefore, this approach is useful and efficient when all your Python code is already in source files.

For a vectorized UDF:

You can use udf(), register() or pandas_udf() to create a vectorized UDF by providing appropriate return and input types. If you would like to use register_from_file() to create a vectorized UDF, you need to explicitly mark the handler function as vectorized using either the vectorized Decorator or a function attribute.

Snowflake supports the following data types for the parameters for a UDF:

Python Type

Snowpark Type

SQL Type

int

LongType

NUMBER

decimal.Decimal

DecimalType

NUMBER

float

FloatType

FLOAT

str

StringType

STRING

bool

BooleanType

BOOL

datetime.time

TimeType

TIME

datetime.date

DateType

DATE

datetime.datetime

TimestampType

TIMESTAMP

bytes or bytearray

BinaryType

BINARY

list

ArrayType

ARRAY

dict

MapType

OBJECT

Dynamically mapped to the native Python type

VariantType

VARIANT

dict

GeographyType

GEOGRAPHY

pandas.Series

PandasSeriesType

No SQL type

pandas.DataFrame

PandasDataFrameType

No SQL type

Note

1. Data with the VARIANT SQL type will be converted to a Python type dynamically inside a UDF. The following SQL types are converted to str in UDFs rather than native Python types: TIME, DATE, TIMESTAMP and BINARY.

2. Data returned as ArrayType (list), MapType (dict) or VariantType (Variant) by a UDF will be represented as a json string. You can call eval() or json.loads() to convert the result to a native Python object. Data returned as GeographyType (Geography) by a UDF will be represented as a GeoJSON string.

3. PandasSeriesType and PandasDataFrameType are used when creating a Pandas (vectorized) UDF, so they are not mapped to any SQL types. element_type in PandasSeriesType and col_types in PandasDataFrameType indicate the SQL types in a Pandas Series and a Pandas DataFrame.

4. To annotate the Snowflake specific Timestamp type (TIMESTAMP_NTZ, TIMESTAMP_LTZ and TIMESTAMP_TZ), use Timestamp with NTZ, LTZ, TZ (e.g., Timestamp[NTZ]).

Example 1

Create a temporary UDF from a lambda and apply it to a dataframe:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one_udf = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
>>> session.range(1, 8, 2).select(add_one_udf("id")).to_df("col1").collect()
[Row(COL1=2), Row(COL1=4), Row(COL1=6), Row(COL1=8)]
Copy
Example 2

Create a UDF with type hints and @udf decorator and apply it to a dataframe:

>>> from snowflake.snowpark.functions import udf
>>> @udf
... def add_udf(x: int, y: int) -> int:
...        return x + y
>>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["x", "y"])
>>> df.select(add_udf("x", "y")).to_df("add_result").collect()
[Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
Copy
Example 3

Create a permanent UDF with a name and call it in SQL:

>>> from snowflake.snowpark.types import IntegerType
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.udf.register(
...     lambda x, y: x * y, return_type=IntegerType(),
...     input_types=[IntegerType(), IntegerType()],
...     is_permanent=True, name="mul", replace=True,
...     stage_location="@mystage",
... )
>>> session.sql("select mul(5, 6) as mul").collect()
[Row(MUL=30)]
>>> # skip udf creation if it already exists
>>> _ = session.udf.register(
...     lambda x, y: x * y + 1, return_type=IntegerType(),
...     input_types=[IntegerType(), IntegerType()],
...     is_permanent=True, name="mul", if_not_exists=True,
...     stage_location="@mystage",
... )
>>> session.sql("select mul(5, 6) as mul").collect()
[Row(MUL=30)]
>>> # overwrite udf definition when it already exists
>>> _ = session.udf.register(
...     lambda x, y: x * y + 1, return_type=IntegerType(),
...     input_types=[IntegerType(), IntegerType()],
...     is_permanent=True, name="mul", replace=True,
...     stage_location="@mystage",
... )
>>> session.sql("select mul(5, 6) as mul").collect()
[Row(MUL=31)]
Copy
Example 4

Create a UDF with UDF-level imports and apply it to a dataframe:

>>> from resources.test_udf_dir.test_udf_file import mod5
>>> from snowflake.snowpark.functions import udf
>>> @udf(imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")])
... def mod5_and_plus1_udf(x: int) -> int:
...     return mod5(x) + 1
>>> session.range(1, 8, 2).select(mod5_and_plus1_udf("id")).to_df("col1").collect()
[Row(COL1=2), Row(COL1=4), Row(COL1=1), Row(COL1=3)]
Copy
Example 5

Create a UDF with UDF-level packages and apply it to a dataframe:

>>> from snowflake.snowpark.functions import udf
>>> import numpy as np
>>> import math
>>> @udf(packages=["numpy"])
... def sin_udf(x: float) -> float:
...     return np.sin(x)
>>> df = session.create_dataframe([0.0, 0.5 * math.pi], schema=["d"])
>>> df.select(sin_udf("d")).to_df("col1").collect()
[Row(COL1=0.0), Row(COL1=1.0)]
Copy
Example 6

Creating a UDF from a local Python file:

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy
Example 7

Creating a UDF from a Python file on an internal stage:

>>> from snowflake.snowpark.types import IntegerType
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udf_dir/test_udf_file.py", "@mystage", auto_compress=False)
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy
Example 8

Use cache to read a file once from a stage in a UDF:

>>> import sys
>>> import os
>>> import cachetools
>>> from snowflake.snowpark.types import StringType
>>> @cachetools.cached(cache={})
... def read_file(filename):
...     import_dir = sys._xoptions.get("snowflake_import_directory")
...     if import_dir:
...         with open(os.path.join(import_dir, filename), "r") as f:
...             return f.read()
>>>
>>> # create a temporary text file for test
>>> temp_file_name = "/tmp/temp.txt"
>>> with open(temp_file_name, "w") as t:
...     _ = t.write("snowpark")
>>> session.add_import(temp_file_name)
>>> session.add_packages("cachetools")
>>> concat_file_content_with_str_udf = session.udf.register(
...     lambda s: f"{read_file(os.path.basename(temp_file_name))}-{s}",
...     return_type=StringType(),
...     input_types=[StringType()]
... )
>>>
>>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
>>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
[Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
>>> os.remove(temp_file_name)
>>> session.clear_imports()
Copy

In this example, the file will only be read once during UDF creation, and will not be read again during UDF execution. This is achieved with a third-party library cachetools. You can also use LRUCache and TTLCache in this package to avoid the cache growing too large. Note that Python built-in cache decorators are not working when registering UDFs using Snowpark, due to the limitation of cloudpickle.

Example 9

Create a vectorized UDF from a lambda with a max batch size and apply it to a dataframe:

>>> from snowflake.snowpark.functions import udf
>>> from snowflake.snowpark.types import IntegerType, PandasSeriesType, PandasDataFrameType
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> add_udf1 = udf(lambda series1, series2: series1 + series2, return_type=PandasSeriesType(IntegerType()),
...               input_types=[PandasSeriesType(IntegerType()), PandasSeriesType(IntegerType())],
...               max_batch_size=20)
>>> df.select(add_udf1("a", "b")).to_df("add_result").collect()
[Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
>>> add_udf2 = udf(lambda df: df[0] + df[1], return_type=PandasSeriesType(IntegerType()),
...               input_types=[PandasDataFrameType([IntegerType(), IntegerType()])],
...               max_batch_size=20)
>>> df.select(add_udf2("a", "b")).to_df("add_result").collect()
[Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
Copy
Example 10

Create a vectorized UDF with type hints and apply it to a dataframe:

>>> from snowflake.snowpark.functions import udf
>>> from snowflake.snowpark.types import PandasSeries, PandasDataFrame
>>> @udf
... def apply_mod5_udf(series: PandasSeries[int]) -> PandasSeries[int]:
...     return series.apply(lambda x: x % 5)
>>> session.range(1, 8, 2).select(apply_mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
>>> @udf
... def mul_udf(df: PandasDataFrame[int, int]) -> PandasSeries[int]:
...     return df[0] * df[1]
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(mul_udf("a", "b")).to_df("col1").collect()
[Row(COL1=2), Row(COL1=12)]
Copy
Example 11

Create a vectorized UDF with original pandas types and Snowpark types and apply it to a dataframe:

>>> # `pandas_udf` is an alias of `udf`, but it can only be used to create a vectorized UDF
>>> from snowflake.snowpark.functions import pandas_udf
>>> from snowflake.snowpark.types import IntegerType
>>> import pandas as pd
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> def add1(series1: pd.Series, series2: pd.Series) -> pd.Series:
...     return series1 + series2
>>> add_udf1 = pandas_udf(add1, return_type=IntegerType(),
...                       input_types=[IntegerType(), IntegerType()])
>>> df.select(add_udf1("a", "b")).to_df("add_result").collect()
[Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
>>> def add2(df: pd.DataFrame) -> pd.Series:
...     return df[0] + df[1]
>>> add_udf2 = pandas_udf(add2, return_type=IntegerType(),
...                       input_types=[IntegerType(), IntegerType()])
>>> df.select(add_udf2("a", "b")).to_df("add_result").collect()
[Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
Copy

Methods

describe(udf_obj)

Returns a DataFrame that describes the properties of a UDF.

register(func[, return_type, input_types, ...])

Registers a Python function as a Snowflake Python UDF and returns the UDF.

register_from_file(file_path, func_name[, ...])

Registers a Python function as a Snowflake Python UDF from a Python or zip file, and returns the UDF.