Vectorized Python UDTFs¶
This topic introduces vectorized Python UDTFs.
Overview¶
Vectorized Python UDTFs (user-defined table functions) enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. Vectorized Python UDTFs allow for easy integration with libraries that operate on pandas DataFrames or pandas arrays.
Use a vectorized UDTF when:
You need to process your data on a partition-by-partition basis instead of on a row-by-row basis.
You need to return multiple rows or columns for each partition.
You want to use libraries that operate on pandas DataFrames for data analysis.
Getting Started with Vectorized Python UDTFs¶
To create a vectorized Python UDTF:
Optionally, define a handler class with an
__init__
method which will be invoked before processing each partition.Do not define a
process
method.Define an
end_partition
method that takes in a DataFrame argument and returns or yields apandas.DataFrame
or a tuple ofpandas.Series
orpandas.arrays
where each array is a column. The column types of the result must match the column types in the UDTF definition.Mark the
end_partition
method as vectorized using the@vectorized
decorator or the_sf_vectorized_input
function attribute. For more information, refer to Vectorized Python UDFs. The@vectorized
decorator can only be used when the Python UDTF is executed within Snowflake, for example, when using a SQL worksheet. When you are executing using the client or a Python worksheet, you must use the function attribute.
Note
To access the columns of the DataFrame corresponding to the input arguments,
use indices 0, 1, 2, etc. For example, use df[0]
to access the first column.
Here is an example of creating a vectorized Python UDTF using the @vectorized
decorator.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Here is an example of creating a vectorized Python UDTF using the function attribute.
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Note
A vectorized UDTF must be called with PARTITION BY clause to build the partitions.
To call the vectorized UDTF with all the data in the same partition:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
To call the vectorized UDTF with the data partitionioned by column x:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Example: Row collection using a regular UDTF vs. using a vectorized UDTF¶
Here is an example of how to do row collection using a regular UDTF.
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Here is an example of how to do row collection using a vectorized UDTF.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Example: Calculate the summary statistic for each column in the partition¶
Here is an example of how to calculate the summary statistic for each column in the partition using
the pandas describe()
method.
First, create a table and generate 3 partitions of 5 rows each.
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Take a look at the data.
select * from test_values;
-----------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |
-----------------------------------------------------
|x |8.0 |99.4 |714.6 |168.7 |397.2 |
|x |106.4 |237.1 |971.7 |828.4 |988.2 |
|x |741.3 |207.9 |32.6 |640.6 |63.2 |
|x |541.3 |828.6 |844.9 |77.3 |403.1 |
|x |4.3 |723.3 |924.3 |282.5 |158.1 |
|y |976.1 |562.4 |968.7 |934.3 |977.3 |
|y |390.0 |244.3 |952.6 |101.7 |24.9 |
|y |599.7 |191.8 |90.2 |788.2 |761.2 |
|y |589.5 |201.0 |863.4 |415.1 |696.1 |
|y |46.7 |659.7 |571.1 |938.0 |513.7 |
|z |313.9 |188.5 |964.6 |435.4 |519.6 |
|z |328.3 |643.1 |766.4 |148.1 |596.4 |
|z |929.0 |255.4 |915.9 |857.2 |425.5 |
|z |612.8 |816.4 |220.2 |879.5 |331.4 |
|z |487.1 |704.5 |471.5 |378.9 |481.2 |
-----------------------------------------------------
Next, create the function.
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas
class handler:
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# using describe function to get the summary statistics
result = df.describe().transpose()
# add a column at the beginning for column ids
result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
return result
$$;
Call the function and partition by id
.
-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 |
|x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 |
|x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 |
|x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 |
|x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 |
|y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 |
|y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 |
|y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 |
|y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 |
|y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 |
|z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 |
|z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 |
|z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 |
|z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 |
|z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Alternatively, call the function and treat the whole table as one partition.
-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 |
|NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 |
|NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 |
|NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 |
|NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------