You are viewing documentation about an older version (1.15.0). View latest version

snowflake.snowpark.functions.pandas_udtf¶

snowflake.snowpark.functions.pandas_udtf(handler: Optional[Callable] = None, *, output_schema: Union[StructType, List[str], PandasDataFrameType], input_types: Optional[List[DataType]] = None, input_names: Optional[List[str]] = None, name: Optional[Union[str, Iterable[str]]] = None, is_permanent: bool = False, stage_location: Optional[str] = None, imports: Optional[List[Union[str, Tuple[str, str]]]] = None, packages: Optional[List[Union[str, module]]] = None, replace: bool = False, if_not_exists: bool = False, session: Optional[Session] = None, parallel: int = 4, statement_params: Optional[Dict[str, str]] = None, strict: bool = False, secure: bool = False, external_access_integrations: Optional[List[str]] = None, secrets: Optional[Dict[str, str]] = None, immutable: bool = False, max_batch_size: Optional[int] = None, comment: Optional[str] = None, **kwargs) → Union[UserDefinedTableFunction, partial][source]¶

Registers a Python class as a vectorized Python UDTF and returns the UDTF.

The arguments, return value and usage of this function are exactly the same as udtf(), but this function can only be used for registering vectorized UDTFs. See examples in UDTFRegistration.

See also

  • udtf()

  • UDTFRegistration.register()

Compared to the default row-by-row processing pattern of a normal UDTF, which sometimes is inefficient, vectorized Python UDTFs (user-defined table functions) enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series.

In addition, vectorized Python UDTFs allow for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

A vectorized UDTF handler class: - defines an end_partition method that takes in a DataFrame argument and returns a pandas.DataFrame or a tuple of pandas.Series or pandas.arrays where each array is a column. - does NOT define a process method. - optionally defines a handler class with an __init__ method which will be invoked before processing each partition.

You can use udtf(), register() or pandas_udtf() to create a vectorized UDTF by providing appropriate return and input types. If you would like to use register_from_file() to create a vectorized UDTF, you need to explicitly mark the handler method as vectorized using either the decorator @vectorized(input=pandas.DataFrame) or setting <class>.end_partition._sf_vectorized_input = pandas.DataFrame

Note: A vectorized UDTF must be called with ~snowflake.snowpark.Window.partition_by to build the partitions.

Example::
>>> from snowflake.snowpark.types import PandasSeriesType, PandasDataFrameType, IntegerType
>>> class multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df):
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> multiply_udtf = pandas_udtf(
...     multiply,
...     output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
...     input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
...     input_names=['"id"', '"col1"', '"col2"']
... )
>>> df = session.create_dataframe([['x', 3, 35.9],['x', 9, 20.5]], schema=["id", "col1", "col2"])
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy

Example:

>>> @pandas_udtf(
... output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
... input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
... input_names=['"id"', '"col1"', '"col2"']
... )
... class _multiply:
...     def __init__(self):
...         self.multiplier = 10
...     def end_partition(self, df):
...         df.col1 = df.col1*self.multiplier
...         df.col2 = df.col2*self.multiplier
...         yield df
>>> df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()
-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|x      |90       |205.0    |
-----------------------------
Copy