Tabular Java UDFs (UDTFs)

This document explains how to write a UDTF (user-defined table function) in Java.

In this Topic:

Introduction

A UDTF can be written in any of the following languages:

  • SQL

  • JavaScript

  • Java

Each UDTF is created by executing the CREATE FUNCTION statement, which specifies:

  • The data types of the arguments passed to the UDTF.

  • The data types of the columns in the rows returned from the UDTF.

  • The code to execute when the UDTF is called. For example, if the UDTF is implemented as a JavaScript UDTF, then the CREATE FUNCTION statement specifies the JavaScript function to call.

A UDTF developer creates a per-row function (or method), which is called once for each input row. The per-row method accepts zero or more parameters. For each input row, the per-row method returns a set of zero, one, or more output rows.

Currently, UDTFs written in all supported languages except SQL have the following additional characteristics:

  • Rows can optionally be grouped into partitions. All rows within a partition are processed together (i.e. they are passed sequentially to the same instance of the per-row method). The output can optionally contain one or more rows that are based on common characteristics of rows within the partition.

    • For partitioned data, UDTFs support an optional initializer method that is called once per partition, before the per-row method starts processing the first row of the partition. The initializer method can initialize variables to use for the entire partition.

    • For partitioned data, UDTFs support an optional finalizer method that is called once per partition, after the per-row method finishes processing the last row of the partition. The finalizer method allows the code to return zero, one, or multiple output rows that are not tied to a specific input row. The output rows can be:

      • Rows that are added to the output already generated for the partition.

      • A row(s) that summarizes the partition. In this case, the per-row method might not have generated any input rows; it might merely have performed calculations that are used by the finalizer method when the finalizer generates its output rows.

    The column on which to partition the rows is specified in the SQL statement that calls the UDTF.

    For more information about partitioning, see Table Functions and Partitions.

For Java UDTFs, the initializer method, per-row method, and finalizer method are implemented as described below:

  • The initializer is implemented by writing a zero-argument constructor.

  • The per-row method is named process().

  • The finalizer is implemented as a zero-argument method named endPartition().

Each Java UDTF requires a handler class, which defines the constructor, process(), and endPartition(). Details are included in Java Classes for UDTFs (in this topic).

Each Java UDTF also requires an output row class, which specifies the Java data types of the columns of the output row(s) that are generated by the handler class. Details are included in The Output Row Class (in this topic).

Java Classes for UDTFs

The primary components of the UDTF are the handler class and the output row class.

The Handler Class

Snowflake interacts with the UDTF primarily by invoking the following methods of the handler class:

  • The initializer (the constructor).

  • The per-row method (process()).

  • The finalizer method (endPartition()).

The handler class can contain additional methods needed to support these three methods.

The handler class also contains a method getOutputClass(), which is described later.

Throwing an exception from any method in the handler class (or the output row class) causes processing to stop. The query that called the UDTF fails with an error message.

The Constructor

A handler class can have a constructor, which must take zero arguments.

The constructor is invoked once for each partition prior to any invocations of process().

The constructor cannot produce output rows.

Use the constructor to initialize state for the partition; this state can be used by the process() and endPartition() methods. The constructor is also the appropriate place to put any long-running initialization that needs to be done only once per partition rather than once per row.

The constructor is optional.

The process() Method

The process() method is invoked once for each row in the input partition.

The arguments passed to the UDTF are passed to process(). The values of the arguments are converted from SQL data types to Java data types. (For information about mapping SQL and Java data types, see SQL-Java Data Type Mappings for Parameters and Return Types.)

The parameter names of the process() method can be any valid Java identifiers; the names do not need to match the names specified in the CREATE FUNCTION statement.

Each time that process() is called, it can return zero, one, or multiple rows.

The data type returned by the process() method must be Stream<OutputRow>, where Stream is defined in java.util.stream.Stream, and OutputRow is the name of the output row class. The example below shows a simple process() method that merely returns its input via a Stream:

import java.util.stream.Stream;

...

public Stream<OutputRow> process(String v) {
  return Stream.of(new OutputRow(v));
}

...

Although uncommon, you can declare the process() method static. A static process() method can be useful for purposes such as validating input rows. For example, if an input row is invalid, the UDTF can discard it. (A scalar UDF could replace invalid values with NULLs, but must return a value and therefore can’t discard a row.)

If process() is static, and the handler class does not contain a constructor or a non-static endPartition() method, then Snowflake does not create an instance of the handler class; Snowflake simply passes each row directly to process().

If process() returns a null Stream, then processing stops. (The endPartition() method is still called even if a null Stream is returned.)

This method is required.

The endPartition() Method

This method can be used to generate output rows that are based on any state information aggregated in process().

This method is invoked once for each partition, after all rows in that partition have been passed to process().

Note

If the user does not partition the data explicitly, then Snowflake partitions the data implicitly. For details, see: partitions.

This method can output zero, one, or multiple rows.

This method is optional.

The getOutputClass() Method

This method returns information about the output row class. The output row class contains information about the data types of the returned row.

The Output Row Class

Snowflake uses the output row class to help specify conversions between Java data types and SQL data types.

When a Java UDTF returns a row, the value in each column of the row must be converted from the Java data type to the corresponding SQL data type. The SQL data types are specified in the RETURNS clause of the CREATE FUNCTION statement. However, the mapping between Java and SQL data types is not 1-to-1, so Snowflake needs to know the Java data type for each returned column. (For more information about mapping SQL and Java data types, see SQL-Java Data Type Mappings for Parameters and Return Types.)

A Java UDTF specifies the Java data types of the output columns by defining an output row class. Each row returned from the UDTF is returned as an instance of the output row class. Each instance of the output row class contains one public field for each output column. Snowflake reads the values of the public fields from each instance of the output row class, converts the Java values to SQL values, and constructs a SQL output row containing those values.

The values in each instance of the output row class are set by calling the output row class’s constructor. The constructor accepts parameters that correspond to the output columns and then sets the public fields to those parameters.

The code below defines a sample output row class:

class OutputRow {

  public String name;
  public int id;

  public OutputRow(String pName, int pId) {
    this.name = pName;
    this.id = pId
  }

}

The public variables specified by this class must match the columns specified in the RETURNS TABLE (...) clause of the CREATE FUNCTION statement. For example, the OutputRow class above corresponds to the RETURNS clause below:

CREATE FUNCTION F(...)
    RETURNS TABLE(NAME VARCHAR, ID INTEGER)
    ...

Important

The matching between the SQL column names and the Java public field names in the output row class is case-insensitive. For example, in the Java and SQL code shown above, the Java field named id corresponds to the SQL column named ID.

The output row class is used as follows:

  • The handler class uses the output row class to specify the return type of the process() method and the endPartition() method. The handler class also uses the output row class to construct returned values. For example:

    public Stream<OutputRow> process(String v) {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    public Stream<OutputRow> endPartition() {
      ...
      return Stream.of(new OutputRow(...));
    }
    
  • The output row class is also used in the handler class’s getOutputClass() method, which is a static method that Snowflake calls in order to learn the Java data types of the outputs:

    public static Class getOutputClass() {
      return OutputRow.class;
    }
    

Throwing an exception from any method in the outpur row class (or the handler class) causes processing to stop. The query that called the UDTF fails with an error message.

Summary of Requirements

The UDTF’s Java code must meet the following requirements:

  • The code must define an output row class.

  • The UDTF handler class must include a public method named process() that returns a Stream of <output_row_class>, where Stream is defined in java.util.stream.Stream.

  • The UDTF handler class must define a public static method named getOutputClass(), which must return <output_row_class>.class.

If the Java code does not meet these requirements, then either creation or execution of the UDTF fails:

  • If the session has an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is created.

  • If the session does not have an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is called.

Table Functions and Partitions

Before rows are passed to table functions, the rows can be grouped into partitions. Partitioning has two main benefits:

  • Partitioning allows Snowflake to divide up the workload to improve parallelization and thus performance.

  • Partitioning allows Snowflake to process all rows with a common characteristic as a group. You can return results that are based on all rows in the group, not just on individual rows.

For example, you might partition stock price data into one group per stock. All stock prices for an individual company can be analyzed together, while stock prices for each company can be analyzed independently of any other company.

Data can be partitioned explicitly or implicitly.

Explicit Partitioning

Explicit Partitioning into Multiple Groups

The following statement calls the UDTF named my_udtf() on individual partitions. Each partition contains all rows for which the PARTITION BY expression evaluates to the same value (e.g. the same company or stock symbol).

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol))

Explicit Partitioning into a Single Group

The following statement calls the UDTF named my_udtf() on one partition. The PARTITION BY <constant> clause (in this case PARTITION BY 1) puts all rows in the same partition.

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY 1))

For a more complete and realistic example, see Examples of Calling Java UDTFs in Queries, in particular the subsection titled Single Partition.

Sorting Rows for Partitions

To process each partition’s rows in a specified order, include an ORDER BY clause. This tells Snowflake to pass the rows to the per-row handler method in the specified order.

For example, if you want to calculate the moving average of a stock price over time, then order the stock prices by timestamp (and partition by stock symbol). The following example shows how to do this:

SELECT *
     FROM stocks_table AS st,
          TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))

An OVER clause can contain an ORDER BY clause even without a PARTITION BY clause.

Remember that including an ORDER BY clause inside an OVER () clause is not the same as putting an ORDER BY clause at the outermost level of the query. If you want the entire query results to be ordered, you need a separate ORDER BY clause. For example:

SELECT *
    FROM stocks_table AS st,
         TABLE(my_udtf(st.symbol, st.transaction_date, st.price) OVER (PARTITION BY st.symbol ORDER BY st.transaction_date))
    ORDER BY st.symbol, st.transaction_date;

Usage Notes for Explicit Partitioning

  • When using a UDTF with a PARTITION BY clause, the PARTITION BY clause must use a column reference or a literal, not a general expression. For example, the following is not allowed:

    SELECT * FROM udtf_table, TABLE(my_func(col1) OVER (PARTITION BY udtf_table.col2 * 2));   -- NO!
    

Implicit Partitioning

If a table function does not explicitly partition the rows by using a PARTITION BY clause, then Snowflake typically partitions the rows implicitly to use parallel processing to improve performance. The partitioning is typically done based on factors such as physical location of the rows (e.g. by micropartition), so the grouping has no meaning.

If the UDF includes a finalizer method (e.g. endPartition() for Java UDFs), the finalizer is called on each partition, regardless of whether the data was partitioned explicitly or implicitly. If the data is not partitioned meaningfully, the output of the finalizer might not be meaningful.

Usage Notes for Partitioning

  • To improve performance, Snowflake usually executes multiple instances of the UDTF handler code in parallel. Each partition of rows is passed to a single instance of the UDTF.

    Although each partition is processed by only one UDTF instance, the converse is not necessarily true — a single UDTF instance can process multiple partitions sequentially. It is therefore important to use the initializer and finalizer to initialize and clean up for each partition to avoid carrying over accumulated values from the processing of one partition to the processing of another partition.

Examples of Calling Java UDTFs in Queries

No Explicit Partitioning

This example shows how to create a UDTF. This example returns two copies of each input and returns one additional row for each partition.

create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$

  import java.util.stream.Stream;

  class OutputRow {

    public String output_value;

    public OutputRow(String output_value) {
      this.output_value = output_value;
    }

  }


  class TestFunction {

    String myString;

    public TestFunction()  {
      myString = "Created in constructor and output from endPartition()";
    }

    public static Class getOutputClass() {
      return OutputRow.class;
    }

    public Stream<OutputRow> process(String input_value) {
      // Return two rows with the same value.
      return Stream.of(new OutputRow(input_value), new OutputRow(input_value));
    }

    public Stream<OutputRow> endPartition() {
      // Returns the value we initialized in the constructor.
      return Stream.of(new OutputRow(myString));
    }

  }

$$;

This example shows how to call a UDTF. To keep this example simple, the statement passes a literal value rather than a column.

SELECT output_value
   FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE                                          |
|-------------------------------------------------------|
| Input string                                          |
| Input string                                          |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+

This example calls the UDTF and passes it values from another table. Each time that the process() method is called, it is passed a value from the city_name column of the current row of the cities_of_interest table. As above, the UDTF is called without explicit partitioning.

Create a simple table to use as a source of inputs:

CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
    ('Toronto'),
    ('Warsaw'),
    ('Kyoto');

Call the Java UDTF:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+

Attention

In this example, the syntax used in the FROM clause is identical to the syntax of an inner join (i.e. FROM t1, t2); however, the operation performed is not a true inner join. The actual behavior is that the function is called with the values from each row in the table. In other words, given the following FROM clause:

from cities_of_interest, table(f(city_name))

the behavior would be equivalent to the following pseudocode:

for city_name in cities_of_interest:
    output_row = f(city_name)

If the statement does not explicitly specify partitioning, then the Snowflake execution engine partitions the input according to multiple factors, such as the size of the warehouse processing the function and the cardinality of the input relation. When running in this mode, the user code can make no assumptions about partitions. Running without an explicit partition is most useful when the UDTF only needs to look at rows in isolation to produce its output and no state is aggregated across rows. In this example, the cardinality of the input was small, so Snowflake created only one partition.

The examples section in the documentation for JavaScript UDTFs contains more complex examples of queries that call UDTFs with values from tables.

Explicit Partitioning

Java UDTFs can also be called using explicit partitioning.

Multiple Partitions

The following example uses the same UDTF and table created earlier. The example partitions the data by city_name.

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Created in constructor and output from endPartition() |
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Created in constructor and output from endPartition() |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Created in constructor and output from endPartition() |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
+-----------+-------------------------------------------------------+

Single Partition

The following example uses the same UDTF and table created earlier and partitions the data by a constant, which forces Snowflake to use only a single partition:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+

Note that only one copy of the message Created in constructor and output from endPartition() was included in the output, which indicates that endPartition() was called only once.

Using Multiple UDFs

The output from one UDTF can be used as the input to another UDTF. The documentation for JavaScript UDTFs contains examples.