snowflake.snowpark.udf.UDFRegistration¶
- class snowflake.snowpark.udf.UDFRegistration(session: Session)[source]¶
Bases:
object
Provides methods to register lambdas and functions as UDFs in the Snowflake database. For more information about Snowflake Python UDFs, see Python UDFs.
session.udf
returns an object of this class. You can use this object to register UDFs that you plan to use in the current session or permanently. The methods that register a UDF return aUserDefinedFunction
object, which you can also use inColumn
expressions.There are two ways to register a UDF with Snowpark:
Use
udf()
orregister()
. By pointing to a runtime Python function, Snowpark uses cloudpickle to serialize this function to bytecode, and deserialize the bytecode to a Python function on the Snowflake server during UDF creation. During the serialization, the global variables used in the Python function will be serialized into the bytecode, but only the name of the module object or any objects from a module that are used in the Python function will be serialized. If the size of the serialized bytecode is over 8K bytes, it will be uploaded to a stage location as a Python file. If it’s under 8K, it will be added to the UDF in-line code. During the deserialization, Python will look up the corresponding modules and objects by names. For example:>>> import numpy >>> from resources.test_udf_dir.test_udf_file import mod5 >>> a = 1 >>> def f(): ... return 2 >>> >>> from snowflake.snowpark.functions import udf >>> session.add_import("tests/resources/test_udf_dir/test_udf_file.py", import_path="resources.test_udf_dir.test_udf_file") >>> session.add_packages("numpy") >>> @udf ... def g(x: int) -> int: ... return mod5(numpy.square(x)) + a + f() >>> df = session.create_dataframe([4], schema=["a"]) >>> df.select(g("a")).to_df("col1").show() ---------- |"COL1" | ---------- |4 | ----------
Here the variable
a
and functionf
will be serialized into the bytecode, but only the name ofnumpy
andmod5
will be included in the bytecode. Therefore, in order to have these modules on the server side, you can useadd_import()
andadd_packages()
to add your first-party and third-party libraries.After deserialization, this function will be executed and applied to every row of your dataframe or table during UDF execution. This approach is very flexible because you can either create a UDF from a function in your current file/notebook, and you can also import the function from elsewhere. However, the limitations of this approach are:
All code inside the function will be executed on every row, so you are not able to perform some initializations before executing this function. For example, if you want to read a file from a stage in a UDF, this file will be read on every row. However, we still have a workaround for this scenario, which can be found in Example 8 here.
If the runtime function references some very large global variables (e.g., a machine learning model with a large number of parameters), they will also be serialized and the size of bytecode can be very large, which will take more time for uploading files. Also, the UDF creation will fail if the referenced global variables cannot be pickled (e.g.,
weakref
object). In this case, you usually have to save such objects in the local environment first, add it to the UDF usingadd_import()
, and read it from the UDF (see Example 8 here).
Use
register_from_file()
. By pointing to a Python file or a zip file containing Python source code and the target function name, Snowpark uploads this file to a stage (which can also be customized), and load the corresponding function from this file to the Python runtime on the Snowflake server during UDF creation. Then this function will be executed and applied to every row of your dataframe or table when executing this UDF. This approach can address the deficiency of the previous approach that uses cloudpickle, because the source code in this file other than the target function will be loaded during UDF creation, and will not be executed on every row during UDF execution. Therefore, this approach is useful and efficient when all your Python code is already in source files.
- For a vectorized UDF:
You can use
udf()
,register()
orpandas_udf()
to create a vectorized UDF by providing appropriate return and input types. If you would like to useregister_from_file()
to create a vectorized UDF, you need to explicitly mark the handler function as vectorized using either the vectorized Decorator or a function attribute.
Snowflake supports the following data types for the parameters for a UDF:
Python Type
Snowpark Type
SQL Type
int
NUMBER
decimal.Decimal
NUMBER
float
FLOAT
str
STRING
bool
BOOL
datetime.time
TIME
datetime.date
DATE
datetime.datetime
TIMESTAMP
bytes
orbytearray
BINARY
list
ARRAY
dict
OBJECT
Dynamically mapped to the native Python type
VARIANT
dict
GEOGRAPHY
pandas.Series
No SQL type
pandas.DataFrame
No SQL type
Note
1. Data with the VARIANT SQL type will be converted to a Python type dynamically inside a UDF. The following SQL types are converted to
str
in UDFs rather than native Python types: TIME, DATE, TIMESTAMP and BINARY.2. Data returned as
ArrayType
(list
),MapType
(dict
) orVariantType
(Variant
) by a UDF will be represented as a json string. You can calleval()
orjson.loads()
to convert the result to a native Python object. Data returned asGeographyType
(Geography
) by a UDF will be represented as a GeoJSON string.3.
PandasSeriesType
andPandasDataFrameType
are used when creating a pandas (vectorized) UDF, so they are not mapped to any SQL types.element_type
inPandasSeriesType
andcol_types
inPandasDataFrameType
indicate the SQL types in a pandas Series and a pandas DataFrame.4. To annotate the Snowflake specific Timestamp type (TIMESTAMP_NTZ, TIMESTAMP_LTZ and TIMESTAMP_TZ), use
Timestamp
withNTZ
,LTZ
,TZ
(e.g.,Timestamp[NTZ]
).- Example 1
Create a temporary UDF from a lambda and apply it to a dataframe:
>>> from snowflake.snowpark.types import IntegerType >>> from snowflake.snowpark.functions import udf >>> add_one_udf = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()]) >>> session.range(1, 8, 2).select(add_one_udf("id")).to_df("col1").collect() [Row(COL1=2), Row(COL1=4), Row(COL1=6), Row(COL1=8)]
- Example 2
Create a UDF with type hints and
@udf
decorator and apply it to a dataframe:>>> from snowflake.snowpark.functions import udf >>> @udf ... def add_udf(x: int, y: int) -> int: ... return x + y >>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["x", "y"]) >>> df.select(add_udf("x", "y")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
- Example 3
Create a permanent UDF with a name and call it in SQL:
>>> from snowflake.snowpark.types import IntegerType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.udf.register( ... lambda x, y: x * y, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", replace=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=30)] >>> # skip udf creation if it already exists >>> _ = session.udf.register( ... lambda x, y: x * y + 1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", if_not_exists=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=30)] >>> # overwrite udf definition when it already exists >>> _ = session.udf.register( ... lambda x, y: x * y + 1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", replace=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=31)]
- Example 4
Create a UDF with UDF-level imports and apply it to a dataframe:
>>> from resources.test_udf_dir.test_udf_file import mod5 >>> from snowflake.snowpark.functions import udf >>> @udf(imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")]) ... def mod5_and_plus1_udf(x: int) -> int: ... return mod5(x) + 1 >>> session.range(1, 8, 2).select(mod5_and_plus1_udf("id")).to_df("col1").collect() [Row(COL1=2), Row(COL1=4), Row(COL1=1), Row(COL1=3)]
- Example 5
Create a UDF with UDF-level packages and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> import numpy as np >>> import math >>> @udf(packages=["numpy"]) ... def sin_udf(x: float) -> float: ... return np.sin(x) >>> df = session.create_dataframe([0.0, 0.5 * math.pi], schema=["d"]) >>> df.select(sin_udf("d")).to_df("col1").collect() [Row(COL1=0.0), Row(COL1=1.0)]
- Example 6
Creating a UDF from a local Python file:
>>> # mod5() in that file has type hints >>> mod5_udf = session.udf.register_from_file( ... file_path="tests/resources/test_udf_dir/test_udf_file.py", ... func_name="mod5", ... ) >>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
- Example 7
Creating a UDF from a Python file on an internal stage:
>>> from snowflake.snowpark.types import IntegerType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.file.put("tests/resources/test_udf_dir/test_udf_file.py", "@mystage", auto_compress=False) >>> mod5_udf = session.udf.register_from_file( ... file_path="@mystage/test_udf_file.py", ... func_name="mod5", ... return_type=IntegerType(), ... input_types=[IntegerType()], ... ) >>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
- Example 8
Use cache to read a file once from a stage in a UDF:
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> concat_file_content_with_str_udf = session.udf.register( ... lambda s: f"{read_file(os.path.basename(temp_file_name))}-{s}", ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
In this example, the file will only be read once during UDF creation, and will not be read again during UDF execution. This is achieved with a third-party library cachetools. You can also use
LRUCache
andTTLCache
in this package to avoid the cache growing too large. Note that Python built-in cache decorators are not working when registering UDFs using Snowpark, due to the limitation of cloudpickle.- Example 9
Create a vectorized UDF from a lambda with a max batch size and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> from snowflake.snowpark.types import IntegerType, PandasSeriesType, PandasDataFrameType >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> add_udf1 = udf(lambda series1, series2: series1 + series2, return_type=PandasSeriesType(IntegerType()), ... input_types=[PandasSeriesType(IntegerType()), PandasSeriesType(IntegerType())], ... max_batch_size=20) >>> df.select(add_udf1("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)] >>> add_udf2 = udf(lambda df: df[0] + df[1], return_type=PandasSeriesType(IntegerType()), ... input_types=[PandasDataFrameType([IntegerType(), IntegerType()])], ... max_batch_size=20) >>> df.select(add_udf2("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
- Example 10
Create a vectorized UDF with type hints and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> from snowflake.snowpark.types import PandasSeries, PandasDataFrame >>> @udf ... def apply_mod5_udf(series: PandasSeries[int]) -> PandasSeries[int]: ... return series.apply(lambda x: x % 5) >>> session.range(1, 8, 2).select(apply_mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)] >>> @udf ... def mul_udf(df: PandasDataFrame[int, int]) -> PandasSeries[int]: ... return df[0] * df[1] >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> df.select(mul_udf("a", "b")).to_df("col1").collect() [Row(COL1=2), Row(COL1=12)]
- Example 11
Create a vectorized UDF with original
pandas
types and Snowpark types and apply it to a dataframe:>>> # `pandas_udf` is an alias of `udf`, but it can only be used to create a vectorized UDF >>> from snowflake.snowpark.functions import pandas_udf >>> from snowflake.snowpark.types import IntegerType >>> import pandas as pd >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> def add1(series1: pd.Series, series2: pd.Series) -> pd.Series: ... return series1 + series2 >>> add_udf1 = pandas_udf(add1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()]) >>> df.select(add_udf1("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)] >>> def add2(df: pd.DataFrame) -> pd.Series: ... return df[0] + df[1] >>> add_udf2 = pandas_udf(add2, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()]) >>> df.select(add_udf2("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
Methods
describe
(udf_obj)Returns a
DataFrame
that describes the properties of a UDF.register
(func[, return_type, input_types, ...])Registers a Python function as a Snowflake Python UDF and returns the UDF.
register_from_file
(file_path, func_name[, ...])Registers a Python function as a Snowflake Python UDF from a Python or zip file, and returns the UDF.