You are viewing documentation about an older version (1.4.0). View latest version

snowflake.snowpark.udtf.UDTFRegistration

class snowflake.snowpark.udtf.UDTFRegistration(session: Session)[source]

Bases: object

Provides methods to register classes as UDTFs in the Snowflake database. For more information about Snowflake Python UDTFs, see Python UDTFs.

session.udtf returns an object of this class. You can use this object to register UDTFs that you plan to use in the current session or permanently. The methods that register a UDTF return a UserDefinedTableFunction object, which you can also use to call the UDTF.

Registering a UDTF is like registering a scalar UDF, you can use register() or snowflake.snowpark.functions.udtf() to explicitly register it. You can also use the decorator @udtf. They all use cloudpickle to transfer the code from the client to the server. Another way is to use register_from_file(). Refer to module snowflake.snowpark.udtf.UDTFRegistration for when to use them.

To query a registered UDTF is the same as to query other table functions. Refer to table_function() and join_table_function(). If you want to query a UDTF right after it’s created, you can call the created UserDefinedTableFunction instance like in Example 1 below.

Example 1

Create a temporary UDTF and call it:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> # Or you can lateral-join a UDTF like any other table functions
>>> df = session.create_dataframe([2, 3], schema=["c"])
>>> df.join_table_function(generator_udtf(df["c"])).sort("c", "number").show()
------------------
|"C"  |"NUMBER"  |
------------------
|2    |0         |
|2    |1         |
|3    |0         |
|3    |1         |
|3    |2         |
------------------
Copy
Example 2

Create a UDTF with type hints and @udtf decorator and query it:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["number"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in range(n):
...             yield (i, )
>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 3

Create a permanent UDTF with a name and call it in SQL:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(
...     GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()],
...     is_permanent=True, name="generator_udtf", replace=True, stage_location="@mystage"
... )
>>> session.sql("select * from table(generator_udtf(3))").collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 4

Create a UDTF with type hints:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["n1", "n2"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int, int]]:
...         for i in range(n):
...             yield (i, i+1)
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
Copy
Example 5

Create a UDTF with type hints by using ... for multiple columns of the same type:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["n1", "n2"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int, ...]]:
...         for i in range(n):
...             yield (i, i+1)
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(N1=0, N2=1), Row(N1=1, N2=2), Row(N1=2, N2=3)]
Copy
Example 6

Create a UDTF with UDF-level imports and type hints:

>>> from resources.test_udf_dir.test_udf_file import mod5
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> @udtf(output_schema=["number"], imports=[("tests/resources/test_udf_dir/test_udf_file.py", "resources.test_udf_dir.test_udf_file")])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in range(n):
...             yield (mod5(i), )
>>> session.table_function(generator_udtf(lit(6))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2), Row(NUMBER=3), Row(NUMBER=4), Row(NUMBER=0)]
Copy
Example 7

Create a UDTF with UDF-level packages and type hints:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> import numpy as np
>>> @udtf(output_schema=["number"], packages=["numpy"])
... class generator_udtf:
...     def process(self, n: int) -> Iterable[Tuple[int]]:
...         for i in np.arange(n):
...             yield (i, )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 8

Creating a UDTF with the constructor and end_partition method.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self) -> None:
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
Copy
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy
>>> # Call it by the returned callable instance
>>> df2 = session.table_function(word_count_udtf(lit("w1 w2 w2 w3 w3 w3")))
>>> df2.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy
Example 9

Creating a UDTF from a local Python file:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="tests/resources/test_udtf_dir/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy
Example 10

Creating a UDTF from a Python file on an internal stage:

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

Methods

register(handler, output_schema[, ...])

Registers a Python class as a Snowflake Python UDTF and returns the UDTF.

register_from_file(file_path, handler_name, ...)

Registers a Python class as a Snowflake Python UDTF from a Python or zip file, and returns the UDTF.