Vectorized Python UDTFs¶
This topic introduces vectorized Python UDTFs.
Overview¶
Vectorized Python UDTFs (user-defined table functions) provide a way to operate over rows in batches.
Snowflake supports two kinds of vectorized UDTFs:
UDTFs with a vectorized
end_partition
methodUDTFs with a vectorized
process
method
You must choose one kind because a UDTF can’t have both a vectorized process and a vectorized end_partition
method.
UDTFs with a vectorized end_partition method¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames
and returning results as
pandas DataFrames
or lists of pandas arrays
or pandas Series.
This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.
Use a vectorized end_partition
method when you want to:
Process your data on a partition-by-partition basis instead of on a row-by-row basis.
Return multiple rows or columns for each partition.
Use libraries that operate on pandas DataFrames for data analysis.
UDTFs with a vectorized process method¶
UDTFs with a vectorized process
method provide a way to operate over rows in batches, assuming the operation performs a 1-to-1 mapping.
In other words, the method returns one output row for each input row. The number of columns is not restricted.
Use a vectorized process
method when you want to:
Apply a 1-to-1 transformation with a multi-columnar result in batches.
Use a library that requires
pandas.DataFrame
.Process rows in batches, without explicit partitioning.
Leverage the to_pandas() API to transform the query result directly to a pandas DataFrame.
Prerequisites¶
The Snowpark Library for Python version 1.14.0 or later is required.
Getting started with UDTFs with a vectorized end_partition method¶
To create a UDTF with a vectorized end_partition
method:
Optionally, define a handler class with an
__init__
method which will be invoked before processing each partition.Do not define a
process
method.Define an
end_partition
method that takes in a DataFrame argument and returns or yields apandas.DataFrame
or a tuple ofpandas.Series
orpandas.arrays
where each array is a column. The column types of the result must match the column types in the UDTF definition.Mark the
end_partition
method as vectorized using the@vectorized
decorator or the_sf_vectorized_input
function attribute. For more information, refer to Vectorized Python UDFs. The@vectorized
decorator can only be used when the Python UDTF is executed within Snowflake, for example, when using a SQL worksheet. When you are executing using the client or a Python worksheet, you must use the function attribute.
Note
The default column names for the input DataFrame to a UDTF with a vectorized end_partition
method match the signature of the SQL function.
The column names follow the SQL identifier requirements.
Namely, if an identifier is unquoted it will be capitalized, and if it is double quoted it will be preserved as it is.
Here is an example of creating a UDTF with a vectorized end_partition
method, using the @vectorized
decorator.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Here is an example of creating a UDTF with a vectorized end_partition
method, using the function attribute.
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Note
A UDTF with a vectorized end_partition
method must be called with PARTITION BY clause to build the partitions.
To call the UDTF with all the data in the same partition:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
To call the UDTF with the data partitionioned by column x:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Example: Row collection using a regular UDTF vs. using a UDTF with a vectorized end_partition method¶
Here is an example of how to do row collection using a regular UDTF.
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Here is an example of how to do row collection using a UDTF with a vectorized end_partition
method.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Example: Calculate the summary statistic for each column in the partition¶
Here is an example of how to calculate the summary statistic for each column in the partition using
the pandas describe()
method.
First, create a table and generate 3 partitions of 5 rows each.
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Take a look at the data.
select * from test_values;
-----------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |
-----------------------------------------------------
|x |8.0 |99.4 |714.6 |168.7 |397.2 |
|x |106.4 |237.1 |971.7 |828.4 |988.2 |
|x |741.3 |207.9 |32.6 |640.6 |63.2 |
|x |541.3 |828.6 |844.9 |77.3 |403.1 |
|x |4.3 |723.3 |924.3 |282.5 |158.1 |
|y |976.1 |562.4 |968.7 |934.3 |977.3 |
|y |390.0 |244.3 |952.6 |101.7 |24.9 |
|y |599.7 |191.8 |90.2 |788.2 |761.2 |
|y |589.5 |201.0 |863.4 |415.1 |696.1 |
|y |46.7 |659.7 |571.1 |938.0 |513.7 |
|z |313.9 |188.5 |964.6 |435.4 |519.6 |
|z |328.3 |643.1 |766.4 |148.1 |596.4 |
|z |929.0 |255.4 |915.9 |857.2 |425.5 |
|z |612.8 |816.4 |220.2 |879.5 |331.4 |
|z |487.1 |704.5 |471.5 |378.9 |481.2 |
-----------------------------------------------------
Next, create the function.
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas
class handler:
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# using describe function to get the summary statistics
result = df.describe().transpose()
# add a column at the beginning for column ids
result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
return result
$$;
Call the function and partition by id
.
-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 |
|x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 |
|x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 |
|x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 |
|x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 |
|y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 |
|y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 |
|y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 |
|y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 |
|y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 |
|z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 |
|z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 |
|z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 |
|z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 |
|z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Alternatively, call the function and treat the whole table as one partition.
-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 |
|NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 |
|NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 |
|NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 |
|NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Getting started with UDTFs with a vectorized process method¶
To create a UDTF with a vectorized process
method:
Define a handler class, similar to regular UDTFs, with optional
__init__
andend_partition
methods.Define a
process
method that takes in a DataFrame argument and returns either apandas.DataFrame
or a tuple ofpandas.Series
orpandas.arrays
where each array is a column. The column types of the result must match the column types in the UDTF definition. The returned result must be exactly one DataFrame or tuple. This is different from a vectorizedend_partition
method where you can yield or return a list.Mark the
process
method as vectorized using the@vectorized
decorator or the_sf_vectorized_input
function attribute. For more information, refer to Vectorized Python UDFs. The@vectorized
decorator can only be used when the Python UDTF is executed within Snowflake, for example, when using a SQL worksheet. When you are executing using the client or a Python worksheet, you must use the function attribute. Optionally, you can Set a Target Batch Size if your Python handler function is exceeding the execution time limit.
Note
The default column names for the input DataFrame to a UDTF with a vectorized process
method match the signature of the SQL function.
The column names follow the SQL identifier requirements.
Namely, if an identifier is unquoted it will be capitalized, and if it is double quoted it will be preserved as it is.
The handler for a UDTF with a vectorized process
method can be implemented to process batches in a partition-aware manner or to process them simply batch by batch.
For more information, see Stateful and Stateless Processing.
Example: Use a UDTF with a vectorized process method to apply one hot encoding¶
Here is an example of how to use a UDTF with a vectorized process
method to apply one hot encoding on a table with ten categories.
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
Here is the result.
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
Prepare to print the table.
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Here is the result.
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
It is less convenient, but alternatively, you can obtain the same result with a vectorized UDF. You need to package the results into one column and then unpack the column to restore the results to a usable pandas DataFrame. Here is an example of using a vectorized UDF.
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
Type support¶
Vectorized UDTFs support the same SQL types as
vectorized UDFs. However, for vectorized UDTFs,
SQL NUMBER
arguments with a scale of 0 that all fit in a 64-bit
or smaller integer type will always be mapped to Int16
, Int32
, or Int64
.
Unlike scalar UDFs, if the argument of a UDTF is not nullable, it will not be converted to int16
, int32
, or int64
.
To view a table showing how SQL types are mapped to pandas dtypes, see the type support table in the vectorized Python UDFs topic.
Best practices¶
This section describes best practices.
If a scalar must be returned with each row, build a list of repeated values instead of unpackaging the
numpy
array to create tuples. For example, for a 2-column result, instead of:return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Do this:
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
To improve performance, unpackage semi-structured data into columns. For example, if you have a variant column,
obj
, with elements,x(int)
,y(float)
, andz(string)
, then instead of defining a UDTF with a signature like this:create function vec_udtf(variant obj)
And calling it using
vec_udtf(obj)
, you should define the UDTF with signature:create function vec_udtf(int, float, string)
And call it using
vec_udtf(obj:x, obj:y, obj:z)
.By default, Snowflake encodes the inputs into pandas dtypes that support NULL values (for example, Int64). If you are using a library that requires a primitive type (such as
numpy
) and your input has no NULL values, you should cast the column to a primitive type before using the library. For example:input_df['y'] = input_df['y'].astype("int64")
For more information, see Type Support.
When using UDTFs with a vectorized
end_partition
method, to improve performance and prevent timeouts, avoid usingpandas.concat
to accumulate partial results. Instead, yield the partial result whenever one is ready. For example, instead of:results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Do this:
while(...): partial_result = pd.DataFrame(...) yield partial_result