snowflake.snowpark.udf.UDFRegistration¶
- class snowflake.snowpark.udf.UDFRegistration(session: Session)[source]¶
Bases:
objectProvides methods to register lambdas and functions as UDFs in the Snowflake database. For more information about Snowflake Python UDFs, see Python UDFs.
session.udfreturns an object of this class. You can use this object to register UDFs that you plan to use in the current session or permanently. The methods that register a UDF return aUserDefinedFunctionobject, which you can also use inColumnexpressions.There are two ways to register a UDF with Snowpark:
Use
udf()orregister(). By pointing to a runtime Python function, Snowpark uses cloudpickle to serialize this function to bytecode, and deserialize the bytecode to a Python function on the Snowflake server during UDF creation. During the serialization, the global variables used in the Python function will be serialized into the bytecode, but only the name of the module object or any objects from a module that are used in the Python function will be serialized. If the size of the serialized bytecode is over 8K bytes, it will be uploaded to a stage location as a Python file. If it’s under 8K, it will be added to the UDF in-line code. During the deserialization, Python will look up the corresponding modules and objects by names. For example:>>> import numpy >>> from resources.test_udf_dir.test_udf_file import mod5 >>> a = 1 >>> def f(): ... return 2 >>> >>> from snowflake.snowpark.functions import udf >>> session.add_import("tests/resources/test_udf_dir/test_udf_file.py", import_path="resources.test_udf_dir.test_udf_file") >>> session.add_packages("numpy") >>> @udf ... def g(x: int) -> int: ... return mod5(numpy.square(x)) + a + f() >>> df = session.create_dataframe([4], schema=["a"]) >>> df.select(g("a")).to_df("col1").show() ---------- |"COL1" | ---------- |4 | ----------
Here the variable
aand functionfwill be serialized into the bytecode, but only the name ofnumpyandmod5will be included in the bytecode. Therefore, in order to have these modules on the server side, you can useadd_import()andadd_packages()to add your first-party and third-party libraries.After deserialization, this function will be executed and applied to every row of your dataframe or table during UDF execution. This approach is very flexible because you can either create a UDF from a function in your current file/notebook, and you can also import the function from elsewhere. However, the limitations of this approach are:
All code inside the function will be executed on every row, so you are not able to perform some initializations before executing this function. For example, if you want to read a file from a stage in a UDF, this file will be read on every row. However, we still have a workaround for this scenario, which can be found in Example 8 here.
If the runtime function references some very large global variables (e.g., a machine learning model with a large number of parameters), they will also be serialized and the size of bytecode can be very large, which will take more time for uploading files. Also, the UDF creation will fail if the referenced global variables cannot be pickled (e.g.,
weakrefobject). In this case, you usually have to save such objects in the local environment first, add it to the UDF usingadd_import(), and read it from the UDF (see Example 8 here).
Use
register_from_file(). By pointing to a Python file or a zip file containing Python source code and the target function name, Snowpark uploads this file to a stage (which can also be customized), and load the corresponding function from this file to the Python runtime on the Snowflake server during UDF creation. Then this function will be executed and applied to every row of your dataframe or table when executing this UDF. This approach can address the deficiency of the previous approach that uses cloudpickle, because the source code in this file other than the target function will be loaded during UDF creation, and will not be executed on every row during UDF execution. Therefore, this approach is useful and efficient when all your Python code is already in source files.
- For a vectorized UDF:
You can use
udf(),register()orpandas_udf()to create a vectorized UDF by providing appropriate return and input types. If you would like to useregister_from_file()to create a vectorized UDF, you need to explicitly mark the handler function as vectorized using either the vectorized Decorator or a function attribute.
Snowflake supports the following data types for the parameters for a UDF:
Python Type
Snowpark Type
SQL Type
intNUMBER
decimal.DecimalNUMBER
floatFLOAT
strSTRING
boolBOOL
datetime.timeTIME
datetime.dateDATE
datetime.datetimeTIMESTAMP
bytesorbytearrayBINARY
listARRAY
dictOBJECT
Dynamically mapped to the native Python type
VARIANT
dictGEOGRAPHY
pandas.SeriesNo SQL type
pandas.DataFrameNo SQL type
Note
1. Data with the VARIANT SQL type will be converted to a Python type dynamically inside a UDF. The following SQL types are converted to
strin UDFs rather than native Python types: TIME, DATE, TIMESTAMP and BINARY.2. Data returned as
ArrayType(list),MapType(dict) orVariantType(Variant) by a UDF will be represented as a json string. You can calleval()orjson.loads()to convert the result to a native Python object. Data returned asGeographyType(Geography) by a UDF will be represented as a GeoJSON string.3.
PandasSeriesTypeandPandasDataFrameTypeare used when creating a Pandas (vectorized) UDF, so they are not mapped to any SQL types.element_typeinPandasSeriesTypeandcol_typesinPandasDataFrameTypeindicate the SQL types in a Pandas Series and a Pandas DataFrame.4. To annotate the Snowflake specific Timestamp type (TIMESTAMP_NTZ, TIMESTAMP_LTZ and TIMESTAMP_TZ), use
TimestampwithNTZ,LTZ,TZ(e.g.,Timestamp[NTZ]).- Example 1
Create a temporary UDF from a lambda and apply it to a dataframe:
>>> from snowflake.snowpark.types import IntegerType >>> from snowflake.snowpark.functions import udf >>> add_one_udf = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()]) >>> session.range(1, 8, 2).select(add_one_udf("id")).to_df("col1").collect() [Row(COL1=2), Row(COL1=4), Row(COL1=6), Row(COL1=8)]
- Example 2
Create a UDF with type hints and
@udfdecorator and apply it to a dataframe:>>> from snowflake.snowpark.functions import udf >>> @udf ... def add_udf(x: int, y: int) -> int: ... return x + y >>> df = session.create_dataframe([[1, 2], [3, 4]], schema=["x", "y"]) >>> df.select(add_udf("x", "y")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
- Example 3
Create a permanent UDF with a name and call it in SQL:
>>> from snowflake.snowpark.types import IntegerType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.udf.register( ... lambda x, y: x * y, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", replace=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=30)] >>> # skip udf creation if it already exists >>> _ = session.udf.register( ... lambda x, y: x * y + 1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", if_not_exists=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=30)] >>> # overwrite udf definition when it already exists >>> _ = session.udf.register( ... lambda x, y: x * y + 1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()], ... is_permanent=True, name="mul", replace=True, ... stage_location="@mystage", ... ) >>> session.sql("select mul(5, 6) as mul").collect() [Row(MUL=31)]
- Example 4
Create a UDF with UDF-level imports and apply it to a dataframe:
>>> from resources.test_udf_dir.test_udf_file import mod5 >>> from snowflake.snowpark.functions import udf >>> @udf(imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")]) ... def mod5_and_plus1_udf(x: int) -> int: ... return mod5(x) + 1 >>> session.range(1, 8, 2).select(mod5_and_plus1_udf("id")).to_df("col1").collect() [Row(COL1=2), Row(COL1=4), Row(COL1=1), Row(COL1=3)]
- Example 5
Create a UDF with UDF-level packages and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> import numpy as np >>> import math >>> @udf(packages=["numpy"]) ... def sin_udf(x: float) -> float: ... return np.sin(x) >>> df = session.create_dataframe([0.0, 0.5 * math.pi], schema=["d"]) >>> df.select(sin_udf("d")).to_df("col1").collect() [Row(COL1=0.0), Row(COL1=1.0)]
- Example 6
Creating a UDF from a local Python file:
>>> # mod5() in that file has type hints >>> mod5_udf = session.udf.register_from_file( ... file_path="tests/resources/test_udf_dir/test_udf_file.py", ... func_name="mod5", ... ) >>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
- Example 7
Creating a UDF from a Python file on an internal stage:
>>> from snowflake.snowpark.types import IntegerType >>> _ = session.sql("create or replace temp stage mystage").collect() >>> _ = session.file.put("tests/resources/test_udf_dir/test_udf_file.py", "@mystage", auto_compress=False) >>> mod5_udf = session.udf.register_from_file( ... file_path="@mystage/test_udf_file.py", ... func_name="mod5", ... return_type=IntegerType(), ... input_types=[IntegerType()], ... ) >>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
- Example 8
Use cache to read a file once from a stage in a UDF:
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> concat_file_content_with_str_udf = session.udf.register( ... lambda s: f"{read_file(os.path.basename(temp_file_name))}-{s}", ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
In this example, the file will only be read once during UDF creation, and will not be read again during UDF execution. This is achieved with a third-party library cachetools. You can also use
LRUCacheandTTLCachein this package to avoid the cache growing too large. Note that Python built-in cache decorators are not working when registering UDFs using Snowpark, due to the limitation of cloudpickle.- Example 9
Create a vectorized UDF from a lambda with a max batch size and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> from snowflake.snowpark.types import IntegerType, PandasSeriesType, PandasDataFrameType >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> add_udf1 = udf(lambda series1, series2: series1 + series2, return_type=PandasSeriesType(IntegerType()), ... input_types=[PandasSeriesType(IntegerType()), PandasSeriesType(IntegerType())], ... max_batch_size=20) >>> df.select(add_udf1("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)] >>> add_udf2 = udf(lambda df: df[0] + df[1], return_type=PandasSeriesType(IntegerType()), ... input_types=[PandasDataFrameType([IntegerType(), IntegerType()])], ... max_batch_size=20) >>> df.select(add_udf2("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
- Example 10
Create a vectorized UDF with type hints and apply it to a dataframe:
>>> from snowflake.snowpark.functions import udf >>> from snowflake.snowpark.types import PandasSeries, PandasDataFrame >>> @udf ... def apply_mod5_udf(series: PandasSeries[int]) -> PandasSeries[int]: ... return series.apply(lambda x: x % 5) >>> session.range(1, 8, 2).select(apply_mod5_udf("id")).to_df("col1").collect() [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)] >>> @udf ... def mul_udf(df: PandasDataFrame[int, int]) -> PandasSeries[int]: ... return df[0] * df[1] >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> df.select(mul_udf("a", "b")).to_df("col1").collect() [Row(COL1=2), Row(COL1=12)]
- Example 11
Create a vectorized UDF with original
pandastypes and Snowpark types and apply it to a dataframe:>>> # `pandas_udf` is an alias of `udf`, but it can only be used to create a vectorized UDF >>> from snowflake.snowpark.functions import pandas_udf >>> from snowflake.snowpark.types import IntegerType >>> import pandas as pd >>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b") >>> def add1(series1: pd.Series, series2: pd.Series) -> pd.Series: ... return series1 + series2 >>> add_udf1 = pandas_udf(add1, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()]) >>> df.select(add_udf1("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)] >>> def add2(df: pd.DataFrame) -> pd.Series: ... return df[0] + df[1] >>> add_udf2 = pandas_udf(add2, return_type=IntegerType(), ... input_types=[IntegerType(), IntegerType()]) >>> df.select(add_udf2("a", "b")).to_df("add_result").collect() [Row(ADD_RESULT=3), Row(ADD_RESULT=7)]
Methods
describe(udf_obj)Returns a
DataFramethat describes the properties of a UDF.register(func[, return_type, input_types, ...])Registers a Python function as a Snowflake Python UDF and returns the UDF.
register_from_file(file_path, func_name[, ...])Registers a Python function as a Snowflake Python UDF from a Python or zip file, and returns the UDF.