Implementing User-Defined Table Functions (UDTFs) in Python¶
This topic explains how to write a handler in Python for a user-defined table function (UDTF).
In this Topic:
You can implement a user-defined table function (UDTF) handler in Python. This handler code executes when the UDTF is called. This topic describes how to implement a handler in Python and create the UDTF.
A UDTF is a user-defined function (UDF) that returns tabular results. For more about UDF handlers implemented in Python, see Creating Python UDFs. For more general information about UDFs, see UDFs (User-Defined Functions).
When you create a Python UDTF, you do the following:
Implement a class with methods that Snowflake will invoke when the UDTF is called.
For more details, see Implementing a Handler in this topic.
Create the UDTF in SQL with the CREATE FUNCTION command, specifying your class as the handler. When you create the UDTF, you specify:
Data types of UDTF input parameters.
Data types of columns returned by the UDTF.
Code to execute as a handler when the UDTF is called.
The language in which the handler is implemented.
For more about syntax, see Creating the UDTF with CREATE FUNCTION in this topic.
Table functions (UDTFs) have a limit of 500 input arguments and 500 output columns.
Implementing a Handler¶
You implement a handler class to process UDTF argument values into tabular results and handle partitioned input. For a handler class example, see Handler Class Example in this topic.
When you create the UDTF with CREATE FUNCTION, you specify this class as the UDTF’s handler. For more on the SQL to create the function, see Creating the UDTF with CREATE FUNCTION in this topic.
A handler class implements methods Snowflake will invoke when the UDTF is called. This class contains the UDTF’s logic.
Initializes state for stateful processing of input partitions. For more information, see Initializing the Handler in this topic.
Processes each input row, returning a tabular value as tuples. Snowflake invokes this method, passing input from the UDTF’s arguments. For more information, see Defining a process Method in this topic.
Finalizes processing of input partitions, returning a tabular value as tuples. For more information, see Finalizing Partition Processing in this topic.
Note that throwing an exception from any method in the handler class causes processing to stop. The query that called the UDTF fails with an error message.
If your code doesn’t meet the requirements described here, UDTF creation or execution may fail. Snowflake will detect violations when the CREATE FUNCTION statement executes.
Handler Class Example¶
Code in the following example creates a UDTF whose handler class processes rows in a partition. The
process method processes each
input row, returning a row with the total cost for a stock sale. After processing rows in the partition, it returns (from its
end_partition method) the total for all sales included in the partition.
create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2)) returns table (symbol varchar, total number(10,2)) language python runtime_version=3.8 handler='StockSaleSum' as $$ class StockSaleSum: def __init__(self): self._cost_total = 0 self._symbol = "" def process(self, symbol, quantity, price): self._symbol = symbol cost = quantity * price self._cost_total += cost yield (symbol, cost) def end_partition(self): yield (self._symbol, self._cost_total) $$;
Initializing the Handler¶
You can optionally implement an
__init__ method in your handler class that Snowflake will invoke before the handler has begun
processing rows. For example, you can use this method to establish some partition-scoped state for the handler. Your
method may not produce output rows.
The method’s signature must be of the following form:
For example, you might want to:
Initialize state for a partition, then use this state in the
Execute long-running initialization that needs to be done only once per partition rather than once per row.
You can also execute logic once before partition handling begins by including that code outside the handler class, such as before the class declaration.
For more about processing partitions, see Processing Partitions in this topic.
If you use an
__init__ method, keep in mind that
Can take only
selfas an argument.
Cannot produce output rows. Use your
processmethod implementation for that.
Is invoked once for each partition, and before the
processmethod is invoked.
process method that Snowflake will invoke for each input row.
process method that receives as values the UDTF arguments converted from SQL types, returning data that Snowflake will
use to create the UDTF’s tabular return value.
The method’s signature must be of the following form:
def process(self, *args):
process method must:
Declare method parameters corresponding to UDTF parameters.
Method parameter names needn’t match UDTF parameter names, but the method parameters must be declared in the same order as UDTF parameters are declared.
When passing UDTF argument values to your method, Snowflake will convert the values from SQL types to the Python types you use in the method. For information about how Snowflake maps between SQL and Python data types, see SQL-Python Data Type Mappings for Parameters and Return Types.
Yield one or more tuples (or return an iterable containing tuples), in which the sequence of tuples corresponds to the sequence of UDTF return value columns.
The tuple elements must appear in the same order as UDTF return value columns are declared. For more information, see Returning a Value in this topic.
Snowflake will convert values from Python types to SQL types required by the UDTF declaration. For information about how Snowflake maps between SQL and Python data types, see SQL-Python Data Type Mappings for Parameters and Return Types.
If a method in the handler class throws an exception, processing will stop. The query that called the UDTF will fail with an
error message. If the
process method returns
None, processing stops. (The
end_partition method is still invoked even if
process method returns
process Method Example
Code in the following example shows a
StockSale handler class with a
process method that processes three UDTF arguments
price), returning a single row with two columns (
total). Note that
process method parameters are declared in the same order as
stock_sale parameters. Arguments in the
yield statement are in the same order as columns declared in the
stock_sale RETURNS TABLE clause.
create or replace function stock_sale(symbol varchar, quantity number, price number(10,2)) returns table (symbol varchar, total number(10,2)) language python runtime_version=3.8 handler='StockSale' as $$ class StockSale: def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) $$;
Returning a Value¶
When returning output rows, you can use either
return (but not both) to return tuples with the tabular value. If
the method returns or yields
None, processing for the current row stops.
yield, execute a separate
yieldstatement for each output row. This is the best practice because the lazy evaluation that comes with
yieldenables more efficient processing and can help avoid timeouts.
Each element in the tuple becomes a column value in the result returned by the UDTF. The order of
yieldarguments must match the order of columns declared for the return value in the RETURNS TABLE clause of CREATE FUNCTION.
Code in the following example returns values representing two rows.
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
Note that because the yield argument is a tuple, you must include a trailing comma when passing a single value in the tuple, as in the following example.
return, return an iterable with tuples.
Each value in a tuple becomes a column value in the result returned by the UDTF. The order of column values in a tuple must match the order of columns declared for the return value in the RETURNS TABLE clause of CREATE FUNCTION.
Code in the following example returns two rows, each with two columns: symbol and total.
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
To skip an input row and process the next row (such as when you’re validating the input rows), have the
process method return one
of the following:
None, a list containing
None, or an empty list to skip the row.
Noneto skip a row.
Note that if you have multiple calls to
yield, any calls after a call that returns
Nonewill be ignored by Snowflake.
Code in the following example returns only the rows for which
number is a positive integer. If
number is not positive, the
None to skip the current row and continue processing the next row.
def process(self, number): if number < 1: yield None else: yield (number)
Stateful and Stateless Processing¶
You can implement the handler to process rows in a partition-aware manner or to process them simply row by row.
In partition-aware processing, the handler includes code to manage partition-scoped state. This includes an
__init__method that executes at the start of partition processing and an
end_partitionmethod that Snowflake invokes after processing the partition’s last row. For more information, see Processing Partitions in this topic.
In partition-unaware processing, the handler executes statelessly, ignoring partition boundaries.
To have the handler execute this way, do not include an
Finalizing Partition Processing¶
You can optionally implement an
end_partition method in your handler class that Snowflake will invoke after you have processed all
rows in a partition. In this method, you can execute code for a partition after all of the partition’s rows have been processed.
end_partition method may produce output rows, such as to return the results of a partition-scoped calculation. For more
information, see Processing Partitions in this topic.
The method’s signature must be of the following form:
Snowflake expects the following of your
end_partition method implementation:
It must not be static.
It may not have any parameters other than
As an alternative to returning a tabular value, it may produce an empty list or
You can process partitions in input with code that executes per partition (such as to manage state) as well as code that executes for each row in the partition.
When a query includes partitions, it aggregates rows using a specified value, such as the value of a column. The aggregated rows your handler receives are said to be partitioned by that value. Your code can process these partitions and their rows so that the processing for each partition includes partition-scoped state.
Code in the following SQL example queries for stock sale information. It executes a
stock_sale_sum UDTF whose input is
partitioned by the value of the
select stock_sale_sum.symbol, total from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Keep in mind that even when incoming rows are partitioned, your code can ignore the partition separation and just process the rows. For
example, you can omit code designed to handle partition-scoped state, such as a handler class
__init__ method and
end_partition method, and just implement the
process method. For more information, see
Stateful and Stateless Processing in this topic.
To process each partition as a unit, you would:
Implement a handler class
__init__method in which to initialize processing for the partition.
For more information, see Initializing the Handler in this topic.
Include partition-aware code when processing each row with the
For more information on processing rows, see Processing Rows in this topic.
end_partitionmethod to finalize partition processing.
For more information, see Finalizing Partition Processing in this topic.
The following describes the sequence of invocations to your handler when you’ve included code designed to execute per partition.
When processing for a partition starts, and before the first row has been processed, Snowflake uses the
__init__method of your handler class to create an instance of the class.
Here, you can establish partition-scoped state. For example, you might initialize an instance variable to hold a value calculated from rows in the partition.
For each row in the partition, Snowflake invokes the
Each time the method executes, it can make changes to state values. For example, you might have the
processmethod update the value of the instance variable.
After your code has processed the last row in the partition, Snowflake invokes your
From this method you can return output rows containing a partition-level value you want to return. For example, you might return the value of the instance variable you’ve been updating as you processed rows in the partition.
end_partitionmethod won’t receive any arguments from Snowflake, which simply invokes it after you process the last row in the partition.
Partition Handling Example¶
Code in the following example calculates the total cost paid across purchases for a stock by first calculating the cost per purchase and
adding purchases together (in the
process method). The code returns the total in the
class StockSaleSum: def __init__(self): self._cost_total = 0 self._symbol = "" def process(self, symbol, quantity, price): self._symbol = symbol cost = quantity * price self._cost_total += cost yield (symbol, cost) def end_partition(self): yield (self._symbol, self._cost_total)
When processing partitions, keep in mind the following:
Your code may handle partitions that aren’t explicitly specified in a call to the UDTF. Even when a call to the UDTF doesn’t include a PARTITION BY clause, Snowflake partitions the data implicitly.
processmethod will receive row data in the order specified by the partition’s ORDER BY clause, if any.
Using an Imported Package¶
You can use Python packages that are included in a curated list of third party packages from Anaconda available in Snowflake. To specify these packages as dependencies in the UDTF, use the PACKAGES clause in CREATE FUNCTION.
You can discover the list of included packages by executing the following SQL in Snowflake:
select * from information_schema.packages where language = 'python';
Code in the following example uses a function in the NumPy (Numerical Python) package to calculate the average price per share from an array of stock purchases, each with a different price per share.
create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2)) returns table (symbol varchar, total number(10,2)) language python runtime_version=3.8 packages = ('numpy') handler='StockSaleAverage' as $$ import numpy as np class StockSaleAverage: def __init__(self): self._price_array =  self._quantity_total = 0 self._symbol = "" def process(self, symbol, quantity, price): self._symbol = symbol self._price_array.append(float(price)) cost = quantity * price yield (symbol, cost) def end_partition(self): np_array = np.array(self._price_array) avg = np.average(np_array) yield (self._symbol, avg) $$;
Creating the UDTF with
You create a UDTF in SQL using the CREATE FUNCTION command, specifying the code you wrote as the handler. For the command reference, see CREATE FUNCTION.
Use the following syntax when creating a UDTF.
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] ) RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] ) LANGUAGE PYTHON [ IMPORTS = ( '<imports>' ) ] RUNTIME_VERSION = 3.8 [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ] [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ] HANDLER = '<handler_class>' [ AS '<python_code>' ]
To associate the handler code you’ve written with the UDTF, you do the following when executing CREATE FUNCTION:
In RETURNS TABLE, specify output columns in column name and type pairs.
Set LANGUAGE to PYTHON.
Set the IMPORTS clause value to the path and name of the handler class if the class is in an external location, such as on a stage.
For more information, see Creating Python UDFs.
Set RUNTIME_VERSION to the version of the Python runtime that your code requires. Currently, Snowflake supports only version 3.8.
Set the PACKAGES clause value to the name of one or more packages, if any, required by the handler class.
Set the HANDLER clause value to the name of the handler class.
When associating Python handler code with a UDTF, you can either include the code in-line or refer to it at a location on a Snowflake stage. The HANDLER value is case-sensitive and must match the name of the Python class.
For more information, see UDFs With In-line Code vs. UDFs With Code Uploaded from a Stage.
For a scalar Python UDF, the HANDLER clause value contains the method name.
For a Python UDTF, the HANDLER clause value contains the class name but not a method name.
The reason for the difference is that for a scalar Python UDF, the name of the handler method is chosen by the user and therefore not known in advance by Snowflake, but for a Python UDTF, the names of the methods (such as the
end_partitionmethod) are known because they must match the names specified by Snowflake.
AS '<python_code>'clause is required if the handler code is specified in-line with CREATE FUNCTION.