Tabular Java UDFs (UDTFs)

This document explains how to write a UDTF (user-defined table function) in Java.

Introduction

Your Java UDTF handler class processes rows received in the UDTF call and returns a tabular result. The received rows are partitioned, either implicitly by Snowflake or explicitly in the syntax of the function call. You can use the methods you implement in the class to process individual rows as well as the partitions into which they’re grouped.

Your handler class can process partitions and rows with the following:

  • A zero-argument constructor as an initializer. You can use this to set up partition-scoped state.

  • A process method for processing each row.

  • A zero-argument endPartition method as a finalizer to complete partition processing, including returning a value scoped to the partition.

For more detail, see Java classes for UDTFs (in this topic).

Each Java UDTF also requires an output row class, which specifies the Java data types of the columns of the output row(s) that are generated by the handler class. Details are included in The output row class (in this topic).

Usage notes for partitioning

  • When it receives rows that are implicitly partitioned by Snowflake, your handler code can make no assumptions about partitions. Running with implicit partitioning is most useful when the UDTF only needs to look at rows in isolation to produce its output and no state is aggregated across rows. In this case, the code probably does not need a constructor or an endPartition method.

  • To improve performance, Snowflake usually executes multiple instances of the UDTF handler code in parallel. Each partition of rows is passed to a single instance of the UDTF.

  • Although each partition is processed by only one UDTF instance, the converse is not necessarily true — a single UDTF instance can process multiple partitions sequentially. It is therefore important to use the initializer and finalizer to initialize and clean up for each partition to avoid carrying over accumulated values from the processing of one partition to the processing of another partition.

Note

Tabular functions (UDTFs) have a limit of 500 input arguments and 500 output columns.

Java classes for UDTFs

The primary components of the UDTF are the handler class and the output row class.

The handler class

Snowflake interacts with the UDTF primarily by invoking the following methods of the handler class:

  • The initializer (the constructor).

  • The per-row method (process).

  • The finalizer method (endPartition).

The handler class can contain additional methods needed to support these three methods.

The handler class also contains a method getOutputClass, which is described later.

Throwing an exception from any method in the handler class (or the output row class) causes processing to stop. The query that called the UDTF fails with an error message.

The constructor

A handler class can have a constructor, which must take zero arguments.

The constructor is invoked once for each partition prior to any invocations of process.

The constructor cannot produce output rows.

Use the constructor to initialize state for the partition; this state can be used by the process and endPartition methods. The constructor is also the appropriate place to put any long-running initialization that needs to be done only once per partition rather than once per row.

The constructor is optional.

The process method

The process method is invoked once for each row in the input partition.

The arguments passed to the UDTF are passed to process. The values of the arguments are converted from SQL data types to Java data types. (For information about mapping SQL and Java data types, see SQL-Java Data Type Mappings.)

The parameter names of the process method can be any valid Java identifiers; the names do not need to match the names specified in the CREATE FUNCTION statement.

Each time that process is called, it can return zero, one, or multiple rows.

The data type returned by the process method must be Stream<OutputRow>, where Stream is defined in java.util.stream.Stream, and OutputRow is the name of the output row class. The example below shows a simple process method that merely returns its input via a Stream:

import java.util.stream.Stream;

...

public Stream<OutputRow> process(String v) {
  return Stream.of(new OutputRow(v));
}

...
Copy

If the process method does not keep or use any state in the object (e.g. if the method is designed to just exclude selected input rows from the output), you can declare the method static. If the process method is static and the handler class does not have a constructor or non-static endPartition method, Snowflake passes each row directly to the static process method without constructing an instance of the handler class.

If you need to skip an input row and process the next row (e.g. if you are validating the input rows), return an empty Stream object. For example, the process method below only returns the rows for which number is a positive integer. If number is not positive, the method returns an empty Stream object to skip the current row and continue processing the next row.

public Stream<OutputRow> process(int number) {
  if (inputNumber < 1) {
    return Stream.empty();
  }
  return Stream.of(new OutputRow(number));
}
Copy

If process returns a null Stream, then processing stops. (The endPartition method is still called even if a null Stream is returned.)

This method is required.

The endPartition method

This optional method can be used to generate output rows that are based on any state information aggregated in process. This method is invoked once for each partition, after all rows in that partition have been passed to process.

If you include this method, it is called on each partition, regardless of whether the data was partitioned explicitly or implicitly. If the data is not partitioned meaningfully, the output of the finalizer might not be meaningful.

Note

If the user does not partition the data explicitly, then Snowflake partitions the data implicitly. For details, see: partitions.

This method can output zero, one, or multiple rows.

Note

While Snowflake supports large partitions with timeouts tuned to process them successfully, especially large partitions can cause processing to time out (such as when endPartition takes too long to complete). Please contact Snowflake Support if you need the timeout threshold adjusted for specific usage scenarios.

The getOutputClass method

This method returns information about the output row class. The output row class contains information about the data types of the returned row.

The output row class

Snowflake uses the output row class to help specify conversions between Java data types and SQL data types.

When a Java UDTF returns a row, the value in each column of the row must be converted from the Java data type to the corresponding SQL data type. The SQL data types are specified in the RETURNS clause of the CREATE FUNCTION statement. However, the mapping between Java and SQL data types is not 1-to-1, so Snowflake needs to know the Java data type for each returned column. (For more information about mapping SQL and Java data types, see SQL-Java Data Type Mappings.)

A Java UDTF specifies the Java data types of the output columns by defining an output row class. Each row returned from the UDTF is returned as an instance of the output row class. Each instance of the output row class contains one public field for each output column. Snowflake reads the values of the public fields from each instance of the output row class, converts the Java values to SQL values, and constructs a SQL output row containing those values.

The values in each instance of the output row class are set by calling the output row class’s constructor. The constructor accepts parameters that correspond to the output columns and then sets the public fields to those parameters.

The code below defines a sample output row class:

class OutputRow {

  public String name;
  public int id;

  public OutputRow(String pName, int pId) {
    this.name = pName;
    this.id = pId
  }

}
Copy

The public variables specified by this class must match the columns specified in the RETURNS TABLE (...) clause of the CREATE FUNCTION statement. For example, the OutputRow class above corresponds to the RETURNS clause below:

CREATE FUNCTION F(...)
    RETURNS TABLE(NAME VARCHAR, ID INTEGER)
    ...
Copy

Important

The matching between the SQL column names and the Java public field names in the output row class is case-insensitive. For example, in the Java and SQL code shown above, the Java field named id corresponds to the SQL column named ID.

The output row class is used as follows:

  • The handler class uses the output row class to specify the return type of the process method and the endPartition method. The handler class also uses the output row class to construct returned values. For example:

    public Stream<OutputRow> process(String v) {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    public Stream<OutputRow> endPartition() {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    Copy
  • The output row class is also used in the handler class’s getOutputClass method, which is a static method that Snowflake calls in order to learn the Java data types of the outputs:

    public static Class getOutputClass() {
      return OutputRow.class;
    }
    
    Copy

Throwing an exception from any method in the output row class (or the handler class) causes processing to stop. The query that called the UDTF fails with an error message.

Summary of requirements

The UDTF’s Java code must meet the following requirements:

  • The code must define an output row class.

  • The UDTF handler class must include a public method named process that returns a Stream of <output_row_class>, where Stream is defined in java.util.stream.Stream.

  • The UDTF handler class must define a public static method named getOutputClass, which must return <output_row_class>.class.

If the Java code does not meet these requirements, then either creation or execution of the UDTF fails:

  • If the session has an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is created.

  • If the session does not have an active warehouse at the time the CREATE FUNCTION statement executes, then Snowflake detects violations when the function is called.

Examples of calling Java UDTFs in queries

For general information about calling UDFs and UDTFs, see Calling a UDF.

Calling without explicit partitioning

This example shows how to create a UDTF. This example returns two copies of each input and returns one additional row for each partition.

create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$

  import java.util.stream.Stream;

  class OutputRow {

    public String output_value;

    public OutputRow(String outputValue) {
      this.output_value = outputValue;
    }

  }


  class TestFunction {

    String myString;

    public TestFunction()  {
      myString = "Created in constructor and output from endPartition()";
    }

    public static Class getOutputClass() {
      return OutputRow.class;
    }

    public Stream<OutputRow> process(String inputValue) {
      // Return two rows with the same value.
      return Stream.of(new OutputRow(inputValue), new OutputRow(inputValue));
    }

    public Stream<OutputRow> endPartition() {
      // Returns the value we initialized in the constructor.
      return Stream.of(new OutputRow(myString));
    }

  }

$$;
Copy

This example shows how to call a UDTF. To keep this example simple, the statement passes a literal value rather than a column, and omits the OVER() clause.

SELECT output_value
   FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE                                          |
|-------------------------------------------------------|
| Input string                                          |
| Input string                                          |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+
Copy

This example calls the UDTF with values read from another table. Each time that the process method is called, it is passed a value from the city_name column of the current row of the cities_of_interest table. As above, the UDTF is called without an explicit OVER() clause.

Create a simple table to use as a source of inputs:

CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
    ('Toronto'),
    ('Warsaw'),
    ('Kyoto');
Copy

Call the Java UDTF:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

Attention

In this example, the syntax used in the FROM clause is identical to the syntax of an inner join (i.e. FROM t1, t2); however, the operation performed is not a true inner join. The actual behavior is that the function is called with the values from each row in the table. In other words, given the following FROM clause:

from cities_of_interest, table(f(city_name))
Copy

the behavior would be equivalent to the following pseudocode:

for city_name in cities_of_interest:
    output_row = f(city_name)
Copy

The examples section in the documentation for JavaScript UDTFs contains more complex examples of queries that call UDTFs with values from tables.

If the statement does not explicitly specify partitioning, then the Snowflake execution engine uses implicit partitioning.

If there is only one partition, then the endPartition method is called only once and the output of the query includes only one row that contains the value Created in constructor and output from endPartition(). If the data is grouped into different numbers of partitions during different executions of the statement, the endPartition method is called different numbers of times, and the output contains different numbers of copies of this row.

For more information, see implicit partitioning.

Calling with explicit partitioning

Java UDTFs can also be called using explicit partitioning.

Multiple partitions

The following example uses the same UDTF and table created earlier. The example partitions the data by city_name.

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Created in constructor and output from endPartition() |
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Created in constructor and output from endPartition() |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Created in constructor and output from endPartition() |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
+-----------+-------------------------------------------------------+
Copy

Single partition

The following example uses the same UDTF and table created earlier and partitions the data by a constant, which forces Snowflake to use only a single partition:

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

Note that only one copy of the message Created in constructor and output from endPartition() was included in the output, which indicates that endPartition was called only once.

Processing very large inputs (e.g. large files)

In some cases, a UDTF requires a very large amount of memory to process each input row. For example, a UDTF might read and process a file that is too large to fit into memory.

To process large files in a UDF or UDTF, use the SnowflakeFile or InputStream class. For more information, see Processing unstructured data with UDF and procedure handlers.