Tabular Java UDFs (UDTFs)

This document explains how to write a UDTF (user-defined table function) in Java.

In this Topic:

Introduction

A UDTF can be written in any of the following languages:

  • SQL

  • JavaScript

  • Java

Each UDTF is created by executing the CREATE FUNCTION statement, which specifies:

  • The data types of the arguments passed to the UDTF.

  • The data types of the columns in the rows returned from the UDTF.

  • The code to execute when the UDTF is called. For example, if the UDTF is implemented as a JavaScript UDTF, then the CREATE FUNCTION statement specifies the JavaScript function to call.

A UDTF developer creates a per-row function (or method), which is called once for each input row. The per-row method accepts zero or more parameters. For each input row, the per-row method returns a set of zero, one, or more output rows.

Currently, UDTFs written in all supported languages except SQL have the following additional characteristics:

  • Rows can optionally be grouped into partitions. All rows within a partition are processed together (i.e. they are passed sequentially to the same instance of the per-row method). The output can optionally contain one or more rows that are based on common characteristics of rows within the partition.

    • For partitioned data, UDTFs support an optional initializer method that is called once per partition, before the per-row method starts processing the first row of the partition. The initializer method can initialize variables to use for the entire partition.

    • For partitioned data, UDTFs support an optional finalizer method that is called once per partition, after the per-row method finishes processing the last row of the partition. The finalizer method allows the code to return zero, one, or multiple output rows that are not tied to a specific input row. The output rows can be:

      • Rows that are added to the output already generated for the partition.

      • A row(s) that summarizes the partition. In this case, the per-row method might not have generated any input rows; it might merely have performed calculations that are used by the finalizer method when the finalizer generates its output rows.

    The column on which to partition the rows is specified in the SQL statement that calls the UDTF.

    For more information about partitioning, see Table Functions and Partitions.

For Java UDTFs, the initializer method, per-row method, and finalizer method are implemented as described below:

  • The initializer is implemented by writing a zero-argument constructor.

  • The per-row method is named process().

  • The finalizer is implemented as a zero-argument method named endPartition().

Each Java UDTF requires a handler class, which defines the constructor, process(), and endPartition(). Details are included in Java Classes for UDTFs (in this topic).

Each Java UDTF also requires an output row class, which specifies the Java data types of the columns of the output row(s) that are generated by the handler class. Details are included in The Output Row Class (in this topic).

Note

Tabular functions (UDTFs) have a limit of 500 input arguments and 500 output columns.

Java Classes for UDTFs

The primary components of the UDTF are the handler class and the output row class.

The Handler Class

Snowflake interacts with the UDTF primarily by invoking the following methods of the handler class:

  • The initializer (the constructor).

  • The per-row method (process()).

  • The finalizer method (endPartition()).

The handler class can contain additional methods needed to support these three methods.

The handler class also contains a method getOutputClass(), which is described later.

Throwing an exception from any method in the handler class (or the output row class) causes processing to stop. The query that called the UDTF fails with an error message.

The Constructor

A handler class can have a constructor, which must take zero arguments.

The constructor is invoked once for each partition prior to any invocations of process().

The constructor cannot produce output rows.

Use the constructor to initialize state for the partition; this state can be used by the process() and endPartition() methods. The constructor is also the appropriate place to put any long-running initialization that needs to be done only once per partition rather than once per row.

The constructor is optional.

The process() Method

The process() method is invoked once for each row in the input partition.

The arguments passed to the UDTF are passed to process(). The values of the arguments are converted from SQL data types to Java data types. (For information about mapping SQL and Java data types, see SQL-Java Data Type Mappings for Parameters and Return Types.)

The parameter names of the process() method can be any valid Java identifiers; the names do not need to match the names specified in the CREATE FUNCTION statement.

Each time that process() is called, it can return zero, one, or multiple rows.

The data type returned by the process() method must be Stream<OutputRow>, where Stream is defined in java.util.stream.Stream, and OutputRow is the name of the output row class. The example below shows a simple process() method that merely returns its input via a Stream:

import java.util.stream.Stream;

...

public Stream<OutputRow> process(String v) {
  return Stream.of(new OutputRow(v));
}

...

If the process method does not keep or use any state in the object (e.g. if the method is designed to just exclude selected input rows from the output), you can declare the method static. If the process method is static and the handler class does not have a constructor or non-static endPartition method, Snowflake passes each row directly to the static process method without constructing an instance of the handler class.

If you need to skip an input row and process the next row (e.g. if you are validating the input rows), return an empty Stream object. For example, the process method below only returns the rows for which number is a positive integer. If number is not positive, the method returns an empty Stream object to skip the current row and continue processing the next row.

public Stream<OutputRow> process(int number) {
  if (inputNumber < 1) {
    return Stream.empty();
  }
  return Stream.of(new OutputRow(number));
}

If process() returns a null Stream, then processing stops. (The endPartition() method is still called even if a null Stream is returned.)

This method is required.

The endPartition() Method

This method can be used to generate output rows that are based on any state information aggregated in process().

This method is invoked once for each partition, after all rows in that partition have been passed to process().

Note

If the user does not partition the data explicitly, then Snowflake partitions the data implicitly. For details, see: partitions.

This method can output zero, one, or multiple rows.

This method is optional.

The getOutputClass() Method

This method returns information about the output row class. The output row class contains information about the data types of the returned row.

The Output Row Class

Snowflake uses the output row class to help specify conversions between Java data types and SQL data types.

When a Java UDTF returns a row, the value in each column of the row must be converted from the Java data type to the corresponding SQL data type. The SQL data types are specified in the RETURNS clause of the CREATE FUNCTION statement. However, the mapping between Java and SQL data types is not 1-to-1, so Snowflake needs to know the Java data type for each returned column. (For more information about mapping SQL and Java data types, see SQL-Java Data Type Mappings for Parameters and Return Types.)

A Java UDTF specifies the Java data types of the output columns by defining an output row class. Each row returned from the UDTF is returned as an instance of the output row class. Each instance of the output row class contains one public field for each output column. Snowflake reads the values of the public fields from each instance of the output row class, converts the Java values to SQL values, and constructs a SQL output row containing those values.

The values in each instance of the output row class are set by calling the output row class’s constructor. The constructor accepts parameters that correspond to the output columns and then sets the public fields to those parameters.

The code below defines a sample output row class:

class OutputRow {

  public String name;
  public int id;

  public OutputRow(String pName, int pId) {
    this.name = pName;
    this.id = pId
  }

}

The public variables specified by this class must match the columns specified in the RETURNS TABLE (...) clause of the CREATE FUNCTION statement. For example, the OutputRow class above corresponds to the RETURNS clause below:

CREATE FUNCTION F(...)
    RETURNS TABLE(NAME VARCHAR, ID INTEGER)
    ...

Important

The matching between the SQL column names and the Java public field names in the output row class is case-insensitive. For example, in the Java and SQL code shown above, the Java field named id corresponds to the SQL column named ID.

The output row class is used as follows:

  • The handler class uses the output row class to specify the return type of the process() method and the endPartition() method. The handler class also uses the output row class to construct returned values. For example:

    public Stream<OutputRow> process(String v) {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    public Stream<OutputRow> endPartition() {
      ...
      return Stream.of(new OutputRow(...));
    }
    
  • The output row class is also used in the handler class’s getOutputClass() method, which is a static method that Snowflake calls in order to learn the Java data types of the outputs:

    public static Class getOutputClass() {
      return OutputRow.class;
    }
    

Throwing an exception from any method in the outpur row class (or the handler class) causes processing to stop. The query that called the UDTF fails with an error message.

Summary of Requirements

The UDTF’s Java code must meet the following requirements:

  • The code must define an output row class.

  • The UDTF handler class must include a public method named process() that returns a Stream of <output_row_class>, where Stream is defined in java.util.stream.Stream.

  • The UDTF handler class must define a public static method named getOutputClass(), which must return <output_row_class>.class.

If the Java code does not meet these requirements, then either creation or execution of the UDTF fails:

  • If the session has an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is created.

  • If the session does not have an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is called.

Calling a Java UDTF

Call a UDTF the way you would call any table function. When calling a UDTF in the FROM clause of a query, specify the UDTF’s name and arguments inside the parentheses that follow the TABLE keyword.

In other words, use a form such as the following for the TABLE keyword when calling a UDTF:

SELECT ...
  FROM TABLE ( udtf_name (udtf_arguments) )

For example, the following statement calls a table function and passes it a DATE literal:

select ...
  from table(my_java_udtf('2021-01-16'::DATE));

The argument to a table function can be an expression, not just a literal. For example, a table function can be called using a column from a table. Some examples are below, including in the Examples section.

For more information about table functions in general, see table function.

Table Functions and Partitions

Before rows are passed to table functions, the rows can be grouped into partitions. Partitioning has two main benefits:

  • Partitioning allows Snowflake to divide up the workload to improve parallelization and thus performance.

  • Partitioning allows Snowflake to process all rows with a common characteristic as a group. You can return results that are based on all rows in the group, not just on individual rows.

For example, you might partition stock price data into one group per stock. All stock prices for an individual company can be analyzed together, while stock prices for each company can be analyzed independently of any other company.

Data can be partitioned explicitly or implicitly.

Explicit Partitioning

Explicit Partitioning into Multiple Groups

The following statement calls the UDTF named my_udtf() on individual partitions. Each partition contains all rows for which the PARTITION BY expression evaluates to the same value (e.g. the same company or stock symbol).

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol))

Explicit Partitioning into a Single Group

The following statement calls the UDTF named my_udtf() on one partition. The PARTITION BY <constant> clause (in this case PARTITION BY 1) puts all rows in the same partition.

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY 1))

For a more complete and realistic example, see Examples of Calling Java UDTFs in Queries, in particular the subsection titled Single Partition.

Sorting Rows for Partitions

To process each partition’s rows in a specified order, include an ORDER BY clause. This tells Snowflake to pass the rows to the per-row handler method in the specified order.

For example, if you want to calculate the moving average of a stock price over time, then order the stock prices by timestamp (and partition by stock symbol). The following example shows how to do this:

SELECT *
     FROM stocks_table AS st,
          TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))

An OVER clause can contain an ORDER BY clause even without a PARTITION BY clause.

Remember that including an ORDER BY clause inside an OVER () clause is not the same as putting an ORDER BY clause at the outermost level of the query. If you want the entire query results to be ordered, you need a separate ORDER BY clause. For example:

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))
    ORDER BY st.symbol, st.transaction_date, st.transaction_time;

Usage Notes for Explicit Partitioning

When using a UDTF with a PARTITION BY clause, the PARTITION BY clause must use a column reference or a literal, not a general expression. For example, the following is not allowed:

SELECT * FROM udtf_table, TABLE(my_func(col1) OVER (PARTITION BY udtf_table.col2 * 2));   -- NO!

Implicit Partitioning

If a table function does not explicitly partition the rows by using a PARTITION BY clause, then Snowflake typically partitions the rows implicitly to use parallel processing to improve performance.

The number of partitions is typically based on factors such as the size of the warehouse processing the function and the cardinality of the input relation. The rows are typically assigned to specific partitions based on factors such as physical location of the rows (e.g. by micro-partition), so the grouping has no meaning.

When running with implicit partitioning, the user code can make no assumptions about partitions. Running with implicit partitioning is most useful when the UDTF only needs to look at rows in isolation to produce its output and no state is aggregated across rows. In this case, the code probably does not need a constructor or an endPartition() method.

If the UDF includes a finalizer method (e.g. endPartition() for Java UDFs), the finalizer is called on each partition, regardless of whether the data was partitioned explicitly or implicitly. If the data is not partitioned meaningfully, the output of the finalizer might not be meaningful.

Usage Notes for Partitioning

  • To improve performance, Snowflake usually executes multiple instances of the UDTF handler code in parallel. Each partition of rows is passed to a single instance of the UDTF.

    Although each partition is processed by only one UDTF instance, the converse is not necessarily true — a single UDTF instance can process multiple partitions sequentially. It is therefore important to use the initializer and finalizer to initialize and clean up for each partition to avoid carrying over accumulated values from the processing of one partition to the processing of another partition.

Examples of Calling Java UDTFs in Queries

Calling Without Explicit Partitioning

This example shows how to create a UDTF. This example returns two copies of each input and returns one additional row for each partition.

create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$

  import java.util.stream.Stream;

  class OutputRow {

    public String output_value;

    public OutputRow(String outputValue) {
      this.output_value = outputValue;
    }

  }


  class TestFunction {

    String myString;

    public TestFunction()  {
      myString = "Created in constructor and output from endPartition()";
    }

    public static Class getOutputClass() {
      return OutputRow.class;
    }

    public Stream<OutputRow> process(String inputValue) {
      // Return two rows with the same value.
      return Stream.of(new OutputRow(inputValue), new OutputRow(inputValue));
    }

    public Stream<OutputRow> endPartition() {
      // Returns the value we initialized in the constructor.
      return Stream.of(new OutputRow(myString));
    }

  }

$$;

This example shows how to call a UDTF. To keep this example simple, the statement passes a literal value rather than a column, and omits the OVER() clause.

SELECT output_value
   FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE                                          |
|-------------------------------------------------------|
| Input string                                          |
| Input string                                          |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+

This example calls the UDTF with values read from from another table. Each time that the process() method is called, it is passed a value from the city_name column of the current row of the cities_of_interest table. As above, the UDTF is called without an explicit OVER() clause.

Create a simple table to use as a source of inputs:

CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
    ('Toronto'),
    ('Warsaw'),
    ('Kyoto');

Call the Java UDTF:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+

Attention

In this example, the syntax used in the FROM clause is identical to the syntax of an inner join (i.e. FROM t1, t2); however, the operation performed is not a true inner join. The actual behavior is that the function is called with the values from each row in the table. In other words, given the following FROM clause:

from cities_of_interest, table(f(city_name))

the behavior would be equivalent to the following pseudocode:

for city_name in cities_of_interest:
    output_row = f(city_name)

The examples section in the documentation for JavaScript UDTFs contains more complex examples of queries that call UDTFs with values from tables.

If the statement does not explicitly specify partitioning, then the Snowflake execution engine uses implicit partitioning.

If there is only one partition, then the endPartition() method is called only once and the output of the query includes only one row that contains the value Created in constructor and output from endPartition(). If the data is grouped into different numbers of partitions during different executions of the statement, the endPartition() method is called different numbers of times, and the output contains different numbers of copies of this row.

For more information, see implicit partitioning.

Calling With Explicit Partitioning

Java UDTFs can also be called using explicit partitioning.

Multiple Partitions

The following example uses the same UDTF and table created earlier. The example partitions the data by city_name.

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Created in constructor and output from endPartition() |
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Created in constructor and output from endPartition() |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Created in constructor and output from endPartition() |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
+-----------+-------------------------------------------------------+

Single Partition

The following example uses the same UDTF and table created earlier and partitions the data by a constant, which forces Snowflake to use only a single partition:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+

Note that only one copy of the message Created in constructor and output from endPartition() was included in the output, which indicates that endPartition() was called only once.

Processing Very Large Inputs (e.g. Large Files)

In some cases, a UDTF requires a very large amount of memory to process each input row. For example, a UDTF might read and process a file that is too large to fit into memory.

To process large files in a UDF or UDTF, use the SnowflakeFile or InputStream class. For more information, see Processing Unstructured Data Using Java UDFs or UDTFs.

Using a Table or UDTF as Input to a UDTF

The input to a table function can come from a table or from another UDTF, as documented in Using a Table as Input to a Table Function.

The example below shows how to use a table to provide input to the Java UDTF splitFileIntoWords():

create table file_names (file_name varchar);
insert into file_names (file_name) values ('sample.txt'),
                                          ('sample_2.txt');

select f.file_name, w.word
   from file_names as f, table(splitFileIntoWords(f.file_name)) as w;

The output looks similar to the following:

+-------------------+------------+
| FILE_NAME         | WORD       |
+-------------------+------------+
| sample_data.txt   | some       |
| sample_data.txt   | words      |
| sample_data_2.txt | additional |
| sample_data_2.txt | words      |
+-------------------+------------+

The imports clause of the Java UDTF must specify the name and path of each file passed to the UDTF. For example:

create function splitFileIntoWords(inputFileName string)
    ...
    imports = ('@inline_jars/sample.txt', '@inline_jars/sample_2.txt')
    ...

Each file must already have been copied to a stage (in this case, the stage named @inline_jars) before the UDTF reads the file.

For an example of using a UDTF as an input to another UDTF, see Extended Examples Using Table Values and Other UDTFs as Input in the JavaScript UDTF documentation.

Back to top