Tabular Java UDFs (UDTFs)¶
This document explains how to write a UDTF (user-defined table function) in Java.
In this Topic:
Introduction¶
A UDTF can be written in any of the following languages:
SQL
JavaScript
Java
Each UDTF is created by executing the CREATE FUNCTION statement, which specifies:
The data types of the arguments passed to the UDTF.
The data types of the columns in the rows returned from the UDTF.
The code to execute when the UDTF is called. For example, if the UDTF is implemented as a JavaScript UDTF, then the
CREATE FUNCTION
statement specifies the JavaScript function to call.
A UDTF developer creates a per-row function (or method), which is called once for each input row. The per-row method accepts zero or more parameters. For each input row, the per-row method returns a set of zero, one, or more output rows.
Currently, UDTFs written in all supported languages except SQL have the following additional characteristics:
Rows can optionally be grouped into partitions. All rows within a partition are processed together (i.e. they are passed sequentially to the same instance of the per-row method). The output can optionally contain one or more rows that are based on common characteristics of rows within the partition.
For partitioned data, UDTFs support an optional initializer method that is called once per partition, before the per-row method starts processing the first row of the partition. The initializer method can initialize variables to use for the entire partition.
For partitioned data, UDTFs support an optional finalizer method that is called once per partition, after the per-row method finishes processing the last row of the partition. The finalizer method allows the code to return zero, one, or multiple output rows that are not tied to a specific input row. The output rows can be:
Rows that are added to the output already generated for the partition.
A row(s) that summarizes the partition. In this case, the per-row method might not have generated any input rows; it might merely have performed calculations that are used by the finalizer method when the finalizer generates its output rows.
The column on which to partition the rows is specified in the SQL statement that calls the UDTF.
For more information about partitioning, see Table Functions and Partitions.
For Java UDTFs, the initializer method, per-row method, and finalizer method are implemented as described below:
The initializer is implemented by writing a zero-argument constructor.
The per-row method is named
process
.The finalizer is implemented as a zero-argument method named
endPartition
.
Each Java UDTF requires a handler class, which defines the constructor, process
, and endPartition
. Details
are included in Java Classes for UDTFs (in this topic).
Each Java UDTF also requires an output row class, which specifies the Java data types of the columns of the output row(s) that are generated by the handler class. Details are included in The Output Row Class (in this topic).
Note
Tabular functions (UDTFs) have a limit of 500 input arguments and 500 output columns.
Java Classes for UDTFs¶
The primary components of the UDTF are the handler class and the output row class.
The Handler Class¶
Snowflake interacts with the UDTF primarily by invoking the following methods of the handler class:
The initializer (the constructor).
The per-row method (
process
).The finalizer method (
endPartition
).
The handler class can contain additional methods needed to support these three methods.
The handler class also contains a method getOutputClass
, which is described later.
Throwing an exception from any method in the handler class (or the output row class) causes processing to stop. The query that called the UDTF fails with an error message.
The Constructor¶
A handler class can have a constructor, which must take zero arguments.
The constructor is invoked once for each partition prior to any invocations of process
.
The constructor cannot produce output rows.
Use the constructor to initialize state for the partition; this state can be used by the process
and
endPartition
methods. The constructor is also the appropriate place to put any long-running initialization that
needs to be done only once per partition rather than once per row.
The constructor is optional.
The process
Method¶
The process
method is invoked once for each row in the input partition.
The arguments passed to the UDTF are passed to process
. The values of the arguments are converted from SQL data types to
Java data types. (For information about mapping SQL and Java data types, see SQL-Java Data Type Mappings for Parameters and Return Types.)
The parameter names of the process
method can be any valid Java identifiers; the names do not need to match the names
specified in the CREATE FUNCTION
statement.
Each time that process
is called, it can return zero, one, or multiple rows.
The data type returned by the process
method must be Stream<OutputRow>
, where Stream is defined in
java.util.stream.Stream, and OutputRow
is the name of the output row class. The example below shows a simple
process
method that merely returns its input via a Stream:
import java.util.stream.Stream;
...
public Stream<OutputRow> process(String v) {
return Stream.of(new OutputRow(v));
}
...
If the process
method does not keep or use any state in the object
(e.g. if the method is designed to just exclude selected input rows from
the output), you can declare the method static
. If the
process
method is static
and the handler class does not
have a constructor or non-static endPartition
method, Snowflake
passes each row directly to the static process
method without
constructing an instance of the handler class.
If you need to skip an input row and process the next row (e.g. if you are
validating the input rows), return an empty Stream
object. For
example, the process
method below only returns the rows for
which number
is a positive integer. If number
is not positive,
the method returns an empty Stream
object to skip the current
row and continue processing the next row.
public Stream<OutputRow> process(int number) {
if (inputNumber < 1) {
return Stream.empty();
}
return Stream.of(new OutputRow(number));
}
If process
returns a null Stream, then processing stops. (The endPartition
method is still called even if
a null Stream is returned.)
This method is required.
The endPartition
Method¶
This method can be used to generate output rows that are based on any state information aggregated in process
.
This method is invoked once for each partition, after all rows in that partition have been
passed to process
.
Note
If the user does not partition the data explicitly, then Snowflake partitions the data implicitly. For details, see: partitions.
This method can output zero, one, or multiple rows.
This method is optional.
The getOutputClass
Method¶
This method returns information about the output row class. The output row class contains information about the data types of the returned row.
The Output Row Class¶
Snowflake uses the output row class to help specify conversions between Java data types and SQL data types.
When a Java UDTF returns a row, the value in each column of the row must be converted from the
Java data type to the corresponding SQL data type. The SQL data types are specified in the RETURNS
clause of the
CREATE FUNCTION
statement. However, the mapping between Java and SQL data types is not 1-to-1, so Snowflake needs to know
the Java data type for each returned column. (For more information about mapping SQL and Java data types, see
SQL-Java Data Type Mappings for Parameters and Return Types.)
A Java UDTF specifies the Java data types of the output columns by defining an output row class. Each row returned from the UDTF is returned as an instance of the output row class. Each instance of the output row class contains one public field for each output column. Snowflake reads the values of the public fields from each instance of the output row class, converts the Java values to SQL values, and constructs a SQL output row containing those values.
The values in each instance of the output row class are set by calling the output row class’s constructor. The constructor accepts parameters that correspond to the output columns and then sets the public fields to those parameters.
The code below defines a sample output row class:
class OutputRow {
public String name;
public int id;
public OutputRow(String pName, int pId) {
this.name = pName;
this.id = pId
}
}
The public variables specified by this class must match the columns specified in the RETURNS TABLE (...)
clause of the
CREATE FUNCTION
statement. For example, the OutputRow
class above corresponds to the RETURNS
clause below:
CREATE FUNCTION F(...)
RETURNS TABLE(NAME VARCHAR, ID INTEGER)
...
Important
The matching between the SQL column names and the Java public field names in the output row class is case-insensitive.
For example, in the Java and SQL code shown above, the Java field named id
corresponds to the SQL column named ID
.
The output row class is used as follows:
The handler class uses the output row class to specify the return type of the
process
method and theendPartition
method. The handler class also uses the output row class to construct returned values. For example:public Stream<OutputRow> process(String v) { ... return Stream.of(new OutputRow(...)); } public Stream<OutputRow> endPartition() { ... return Stream.of(new OutputRow(...)); }The output row class is also used in the handler class’s
getOutputClass
method, which is a static method that Snowflake calls in order to learn the Java data types of the outputs:public static Class getOutputClass() { return OutputRow.class; }
Throwing an exception from any method in the outpur row class (or the handler class) causes processing to stop. The query that called the UDTF fails with an error message.
Summary of Requirements¶
The UDTF’s Java code must meet the following requirements:
The code must define an output row class.
The UDTF handler class must include a public method named
process
that returns a Stream of<output_row_class>
, where Stream is defined in java.util.stream.Stream.The UDTF handler class must define a public static method named
getOutputClass
, which must return<output_row_class>.class
.
If the Java code does not meet these requirements, then either creation or execution of the UDTF fails:
If the session has an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is created.
If the session does not have an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is called.
Calling a Java UDTF¶
Call a UDTF the way you would call any table function. When calling a UDTF in the FROM clause of a query, specify the UDTF’s name and arguments inside the parentheses that follow the TABLE keyword.
In other words, use a form such as the following for the TABLE keyword when calling a UDTF:
SELECT ...
FROM TABLE ( udtf_name (udtf_arguments) )
For example, the following statement calls a table function and passes it a DATE literal:
select ...
from table(my_java_udtf('2021-01-16'::DATE));
The argument to a table function can be an expression, not just a literal. For example, a table function can be called using a column from a table. Some examples are below, including in the Examples section.
For more information about table functions in general, see table function.
Table Functions and Partitions¶
Before rows are passed to table functions, the rows can be grouped into partitions. Partitioning has two main benefits:
Partitioning allows Snowflake to divide up the workload to improve parallelization and thus performance.
Partitioning allows Snowflake to process all rows with a common characteristic as a group. You can return results that are based on all rows in the group, not just on individual rows.
For example, you might partition stock price data into one group per stock. All stock prices for an individual company can be analyzed together, while stock prices for each company can be analyzed independently of any other company.
Data can be partitioned explicitly or implicitly.
Explicit Partitioning¶
Explicit Partitioning into Multiple Groups¶
The following statement calls the UDTF named my_udtf
on individual partitions. Each partition contains all rows for which
the PARTITION BY
expression evaluates to the same value (e.g. the same company or stock symbol).
SELECT *
FROM stocks_table AS st,
TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol))
Explicit Partitioning into a Single Group¶
The following statement calls the UDTF named my_udtf
on one partition. The PARTITION BY <constant>
clause
(in this case PARTITION BY 1
) puts all rows in the same partition.
SELECT *
FROM stocks_table AS st,
TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY 1))
For a more complete and realistic example, see Examples of Calling Java UDTFs in Queries, in particular the subsection titled Single Partition.
Sorting Rows for Partitions¶
To process each partition’s rows in a specified order, include an ORDER BY
clause. This tells Snowflake to pass the rows
to the per-row handler method in the specified order.
For example, if you want to calculate the moving average of a stock price over time, then order the stock prices by timestamp (and partition by stock symbol). The following example shows how to do this:
SELECT *
FROM stocks_table AS st,
TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))
An OVER
clause can contain an ORDER BY
clause even without a PARTITION BY
clause.
Remember that including an ORDER BY
clause inside an OVER ()
clause is not the same as putting an ORDER BY
clause at the
outermost level of the query. If you want the entire query results to be ordered, you need a separate ORDER BY
clause. For
example:
SELECT *
FROM stocks_table AS st,
TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))
ORDER BY st.symbol, st.transaction_date, st.transaction_time;
Usage Notes for Explicit Partitioning¶
When using a UDTF with a PARTITION BY
clause, the PARTITION BY
clause must use a column reference or a literal,
not a general expression. For example, the following is not allowed:
SELECT * FROM udtf_table, TABLE(my_func(col1) OVER (PARTITION BY udtf_table.col2 * 2)); -- NO!
Implicit Partitioning¶
If a table function does not explicitly partition the rows by using a PARTITION BY
clause, then Snowflake typically partitions
the rows implicitly to use parallel processing to improve performance.
The number of partitions is typically based on factors such as the size of the warehouse processing the function and the cardinality of the input relation. The rows are typically assigned to specific partitions based on factors such as physical location of the rows (e.g. by micro-partition), so the grouping has no meaning.
When running with implicit partitioning, the user code can make no assumptions about partitions. Running with implicit partitioning
is most useful when the UDTF only needs to look at rows in isolation to produce its output and no state is aggregated across rows.
In this case, the code probably does not need a constructor or an endPartition
method.
If the UDF includes a finalizer method (e.g. endPartition
for Java UDFs), the finalizer is called on each partition,
regardless of whether the data was partitioned explicitly or implicitly. If the data is not partitioned meaningfully, the output of
the finalizer might not be meaningful.
Usage Notes for Partitioning¶
To improve performance, Snowflake usually executes multiple instances of the UDTF handler code in parallel. Each partition of rows is passed to a single instance of the UDTF.
Although each partition is processed by only one UDTF instance, the converse is not necessarily true — a single UDTF instance can process multiple partitions sequentially. It is therefore important to use the initializer and finalizer to initialize and clean up for each partition to avoid carrying over accumulated values from the processing of one partition to the processing of another partition.
Examples of Calling Java UDTFs in Queries¶
Calling Without Explicit Partitioning¶
This example shows how to create a UDTF. This example returns two copies of each input and returns one additional row for each partition.
create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$
import java.util.stream.Stream;
class OutputRow {
public String output_value;
public OutputRow(String outputValue) {
this.output_value = outputValue;
}
}
class TestFunction {
String myString;
public TestFunction() {
myString = "Created in constructor and output from endPartition()";
}
public static Class getOutputClass() {
return OutputRow.class;
}
public Stream<OutputRow> process(String inputValue) {
// Return two rows with the same value.
return Stream.of(new OutputRow(inputValue), new OutputRow(inputValue));
}
public Stream<OutputRow> endPartition() {
// Returns the value we initialized in the constructor.
return Stream.of(new OutputRow(myString));
}
}
$$;
This example shows how to call a UDTF. To keep this example simple, the statement passes a literal value rather than a column,
and omits the OVER()
clause.
SELECT output_value
FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE |
|-------------------------------------------------------|
| Input string |
| Input string |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+
This example calls the UDTF with values read from from another table. Each time that the process
method is called,
it is passed a value from the city_name
column of the current row of the cities_of_interest
table. As above, the UDTF is
called without an explicit OVER()
clause.
Create a simple table to use as a source of inputs:
CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
('Toronto'),
('Warsaw'),
('Kyoto');
Call the Java UDTF:
SELECT city_name, output_value
FROM cities_of_interest,
TABLE(return_two_copies(city_name))
ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE |
|-----------+-------------------------------------------------------|
| Kyoto | Kyoto |
| Kyoto | Kyoto |
| Toronto | Toronto |
| Toronto | Toronto |
| Warsaw | Warsaw |
| Warsaw | Warsaw |
| NULL | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Attention
In this example, the syntax used in the FROM clause is identical to the syntax of an inner join (i.e. FROM t1, t2
);
however, the operation performed is not a true inner join. The actual behavior is that the function is called with
the values from each row in the table. In other words, given the following FROM clause:
from cities_of_interest, table(f(city_name))
the behavior would be equivalent to the following pseudocode:
for city_name in cities_of_interest: output_row = f(city_name)
The examples section in the documentation for JavaScript UDTFs contains more complex examples of queries that call UDTFs with values from tables.
If the statement does not explicitly specify partitioning, then the Snowflake execution engine uses implicit partitioning.
If there is only one partition, then the endPartition
method is called only once and the output of the query includes only
one row that contains the value Created in constructor and output from endPartition()
. If the data is grouped into
different numbers of partitions during different executions of the statement, the endPartition
method is called different
numbers of times, and the output contains different numbers of copies of this row.
For more information, see implicit partitioning.
Calling With Explicit Partitioning¶
Java UDTFs can also be called using explicit partitioning.
Multiple Partitions¶
The following example uses the same UDTF and table created earlier. The example partitions the data by city_name.
SELECT city_name, output_value
FROM cities_of_interest,
TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE |
|-----------+-------------------------------------------------------|
| Kyoto | Created in constructor and output from endPartition() |
| Kyoto | Kyoto |
| Kyoto | Kyoto |
| Toronto | Created in constructor and output from endPartition() |
| Toronto | Toronto |
| Toronto | Toronto |
| Warsaw | Created in constructor and output from endPartition() |
| Warsaw | Warsaw |
| Warsaw | Warsaw |
+-----------+-------------------------------------------------------+
Single Partition¶
The following example uses the same UDTF and table created earlier and partitions the data by a constant, which forces Snowflake to use only a single partition:
SELECT city_name, output_value
FROM cities_of_interest,
TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE |
|-----------+-------------------------------------------------------|
| Kyoto | Kyoto |
| Kyoto | Kyoto |
| Toronto | Toronto |
| Toronto | Toronto |
| Warsaw | Warsaw |
| Warsaw | Warsaw |
| NULL | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Note that only one copy of the message Created in constructor and output from endPartition()
was included in the output,
which indicates that endPartition
was called only once.
Processing Very Large Inputs (e.g. Large Files)¶
In some cases, a UDTF requires a very large amount of memory to process each input row. For example, a UDTF might read and process a file that is too large to fit into memory.
To process large files in a UDF or UDTF, use the SnowflakeFile
or InputStream
class. For more information, see
Processing Unstructured Data Using Java UDFs or UDTFs.
Using a Table or UDTF as Input to a UDTF¶
The input to a table function can come from a table or from another UDTF, as documented in Using a Table as Input to a Table Function.
The example below shows how to use a table to provide input to the Java UDTF split_file_into_words
:
create table file_names (file_name varchar);
insert into file_names (file_name) values ('sample.txt'),
('sample_2.txt');
select f.file_name, w.word
from file_names as f, table(split_file_into_words(f.file_name)) as w;
The output looks similar to the following:
+-------------------+------------+
| FILE_NAME | WORD |
+-------------------+------------+
| sample_data.txt | some |
| sample_data.txt | words |
| sample_data_2.txt | additional |
| sample_data_2.txt | words |
+-------------------+------------+
The imports
clause of the Java UDTF must specify the name and path of each file passed to the UDTF. For example:
create function split_file_into_words(inputFileName string)
...
imports = ('@inline_jars/sample.txt', '@inline_jars/sample_2.txt')
...
Each file must already have been copied to a stage (in this case, the stage named @inline_jars
) before the UDTF reads the file.
For an example of using a UDTF as an input to another UDTF, see Extended Examples Using Table Values and Other UDTFs as Input in the JavaScript UDTF documentation.