You are viewing documentation about an older version (1.16.0). View latest version

snowflake.snowpark.udtf.UDTFRegistration

class snowflake.snowpark.udtf.UDTFRegistration(session: Optional[Session])[source]

Bases: object

Provides methods to register classes as UDTFs in the Snowflake database. For more information about Snowflake Python UDTFs, see Python UDTFs.

session.udtf returns an object of this class. You can use this object to register UDTFs that you plan to use in the current session or permanently. The methods that register a UDTF return a UserDefinedTableFunction object, which you can also use to call the UDTF.

Registering a UDTF is like registering a scalar UDF, you can use register() or snowflake.snowpark.functions.udtf() to explicitly register it. You can also use the decorator @udtf. They all use cloudpickle to transfer the code from the client to the server. Another way is to use register_from_file(). Refer to module snowflake.snowpark.udtf.UDTFRegistration for when to use them.

To query a registered UDTF is the same as to query other table functions. Refer to table_function() and join_table_function(). If you want to query a UDTF right after it’s created, you can call the created UserDefinedTableFunction instance like in Example 1 below.

Example 1

Create a temporary UDTF and call it:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> # Or you can lateral-join a UDTF like any other table functions
>>> df = session.create_dataframe([2, 3], schema=["c"])
>>> df.join_table_function(generator_udtf(df["c"])).sort("c", "number").show()
------------------
|"C"  |"NUMBER"  |
------------------
|2    |0         |
|2    |1         |
|3    |0         |
|3    |1         |
|3    |2         |
------------------
Copy
Example 2

Create a UDTF with type hints and @udtf decorator and query it:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["number"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in range(n):
...             yield (i, )
>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 3

Create a permanent UDTF with a name and call it in SQL:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(
...     GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()],
...     is_permanent=True, name="generator_udtf", replace=True, stage_location="@mystage"
... )
>>> session.sql("select * from table(generator_udtf(3))").collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 4

Create a UDTF with type hints:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["n1", "n2"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int, int]]:
...         for i in range(n):
...             yield (i, i+1)
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
Copy
Example 5

Create a UDTF with type hints by using ... for multiple columns of the same type:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["n1", "n2"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int, ...]]:
...         for i in range(n):
...             yield (i, i+1)
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
Copy
Example 6

Create a UDTF with UDF-level imports and type hints:

>>> from resources.test_udf_dir.test_udf_file import mod5
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["number"], imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in range(n):
...             yield (mod5(i), )
>>> session.table_function(generator_udtf(lit(6))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2), Row(NUMBER=3), Row(NUMBER=4), Row(NUMBER=0)]
Copy
Example 7

Create a UDTF with UDF-level packages and type hints:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> import numpy as np
>>> @udtf(output_schema=["number"], packages=["numpy"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in np.arange(n):
...             yield (i, )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 8

Creating a UDTF with the constructor and end_partition method.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self) -> None:
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
Copy
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy
>>> # Call it by the returned callable instance
>>> df2 = session.table_function(word_count_udtf(lit("w1 w2 w2 w3 w3 w3")))
>>> df2.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy
Example 9

Creating a UDTF from a local Python file:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="tests/resources/test_udtf_dir/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 10

Creating a UDTF from a Python file on an internal stage:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

You can use udtf(), register() or pandas_udtf() to create a vectorized UDTF by providing appropriate return and input types. If you would like to use register_from_file() to create a vectorized UDTF, you would need to explicitly mark the handler method as vectorized using either the decorator @vectorized(input=pandas.DataFrame) or setting <class>.end_partition._sf_vectorized_input = pandas.DataFrame

Example 11

Creating a vectorized UDTF by specifying a PandasDataFrameType as input_types and a PandasDataFrameType with column names as output_schema.

>>> from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType
>>> class multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df):
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> multiply_udtf = session.udtf.register(
...     multiply,
...     output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
...     input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
...     input_names = ['"id"', '"col1"', '"col2"'],
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy
Example 12

Creating a vectorized UDTF by specifying PandasDataFrame with nested types as type hints.

>>> from snowflake.snowpark.types import PandasDataFrame
>>> class multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df: PandasDataFrame[str, int, float]) -> PandasDataFrame[str, int, float]:
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> multiply_udtf = session.udtf.register(
...     multiply,
...     output_schema=["id_", "col1_", "col2_"],
...     input_names = ['"id"', '"col1"', '"col2"'],
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy
Example 13

Creating a vectorized UDTF by specifying a pandas.DataFrame as type hints and a StructType with type information and column names as output_schema.

>>> import pandas as pd
>>> from snowflake.snowpark.types import IntegerType, StringType, FloatType, StructType, StructField
>>> class multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df: pd.DataFrame) -> pd.DataFrame:
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> multiply_udtf = session.udtf.register(
...     multiply,
...     output_schema=StructType([StructField("id_", StringType()), StructField("col1_", IntegerType()), StructField("col2_", FloatType())]),
...     input_types=[StringType(), IntegerType(), FloatType()],
...     input_names = ['"id"', '"col1"', '"col2"'],
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy
Example 14

Same as Example 12, but does not specify input_names and instead set the column names in end_partition.

>>> from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType
>>> class multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df):
...         df.columns = ["id", "col1", "col2"]
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> multiply_udtf = session.udtf.register(
...     multiply,
...     output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
...     input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy

[Preview Feature] The syntax for declaring UDTF with a vectorized process method is similar to above. Defining __init__ and end_partition methods are optional. The process method only accepts one argument which is the pandas Dataframe object, and outputs the same number of rows as is in the given input. Both __init__ and end_partition do not take any additional arguments.

Example 15

Vectorized UDTF process method without end_partition

>>> class multiply:
...     def process(self, df: PandasDataFrame[str,int, float]) -> PandasDataFrame[int]:
...         return (df['col1'] * 10, )
>>> multiply_udtf = session.udtf.register(
...     multiply,
...     output_schema=["col1x10"],
...     input_names=['"id"', '"col1"', '"col2"']
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select("id", "col1", "col2", multiply_udtf("id", "col1", "col2")).order_by("col1").show()
--------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL1X10"  |
--------------------------------------
|x     |3       |35.9    |30         |
|x     |9       |20.5    |90         |
--------------------------------------
Copy
Example 16

Vectorized UDTF process method with end_partition

>>> class mean:
...     def __init__(self) -> None:
...         self.sum = 0
...         self.len = 0
...     def process(self, df: pd.DataFrame) -> pd.DataFrame:
...         self.sum += df['value'].sum()
...         self.len += len(df)
...         return ([None] * len(df),)
...     def end_partition(self):
...         return ([self.sum / self.len],)
>>> mean_udtf = session.udtf.register(mean,
...                       output_schema=StructType([StructField("mean", FloatType())]),
...                       input_types=[StringType(), IntegerType()],
...                       input_names=['"name"', '"value"'])
>>> df = session.create_dataframe([["x", 10], ["x", 20], ["x", 33], ["y", 10], ["y", 25], ], schema=["name", "value"])
>>> df.select("name", "value", mean_udtf("name", "value").over(partition_by="name")).order_by("name", "value").show()
-----------------------------
|"NAME"  |"VALUE"  |"MEAN"  |
-----------------------------
|x       |NULL     |21.0    |
|x       |10       |NULL    |
|x       |20       |NULL    |
|x       |33       |NULL    |
|y       |NULL     |17.5    |
|y       |10       |NULL    |
|y       |25       |NULL    |
-----------------------------
Copy
Example 17

Vectorized UDTF process method with end_partition and max_batch_size

>>> class sum:
...     def __init__(self):
...         self.sum = None
...     def process(self, df):
...         if self.sum is None:
...             self.sum = df
...         else:
...             self.sum += df
...         return df
...     def end_partition(self):
...         return self.sum
>>> sum_udtf = session.udtf.register(sum,
...         output_schema=PandasDataFrameType([StringType(), IntegerType()], ["id_", "col1_"]),
...         input_types=[PandasDataFrameType([StringType(), IntegerType()])],
...         max_batch_size=1)
>>> df = session.create_dataframe([["x", 10], ["x", 20], ["x", 33], ["y", 10], ["y", 25], ], schema=["id", "col1"])
>>> df.select("id", "col1", sum_udtf("id", "col1").over(partition_by="id")).order_by("id", "col1").show()
-----------------------------------
|"ID"  |"COL1"  |"ID_"  |"COL1_"  |
-----------------------------------
|x     |NULL    |xxx    |63       |
|x     |10      |x      |10       |
|x     |20      |x      |20       |
|x     |33      |x      |33       |
|y     |NULL    |yy     |35       |
|y     |10      |y      |10       |
|y     |25      |y      |25       |
-----------------------------------
Copy

Methods

register(handler, output_schema[, ...])

Registers a Python class as a Snowflake Python UDTF and returns the UDTF.

register_from_file(file_path, handler_name, ...)

Registers a Python class as a Snowflake Python UDTF from a Python or zip file, and returns the UDTF.