表形式のJava UDFs (UDTFs)

このドキュメントでは、Javaで UDTF (ユーザー定義の テーブル関数)を作成する方法について説明します。

このトピックの内容:

概要

Java UDTF ハンドラークラスは、 UDTF 呼び出しで受け取った行を処理し、表形式の結果を返します。受け取った行は、Snowflakeによって暗黙的に、または関数呼び出しの構文で明示的に分割されます。クラスに実装するメソッドを使用して、個別の行とグループ化されたパーティションを処理できます。

ハンドラークラスは、次のようにパーティションと行を処理できます。

  • 初期化子としての引数なしのコンストラクター。これを使用して、パーティションスコープの状態を設定できます。

  • 各行を処理する process メソッド。

  • パーティションに範囲指定された値を返すことを含め、パーティション処理を完了するファイナライザーとしての引数なしの endPartition メソッド。

詳細については、 UDTFs のJavaクラス (このトピック内)をご参照ください。

各Java UDTF には、ハンドラークラスによって生成される、出力行の列のJavaデータ型を指定する 出力行クラス も必要です。詳細は、 出力行クラス (このトピック内)に含まれています。

パーティション分割の使用上の注意

  • Snowflakeによって暗黙的にパーティション分割された行を受け取ると、ハンドラーコードはパーティションについて何も仮定できません。明示的なパーティション分割の実行は、 UDTF が出力を生成するために行を分離して見るだけで、行間でステージを集約しない場合に最も役立ちます。この場合、コードには大抵、コンストラクターや endPartition メソッドを必要としません。

  • パフォーマンスを向上させるために、Snowflakeは通常、 UDTF ハンドラーコードの複数のインスタンスを並行して実行します。行の各パーティションは、 UDTF の単一のインスタンスに渡されます。

  • 各パーティションは1つの UDTF インスタンスのみで処理されますが、逆も当てはまるとは限りません。単一の UDTF インスタンスは、複数のパーティションを順に処理できます。したがって、初期化子とファイナライザーを使用して各パーティションの初期化とクリーンアップを行い、累積値が、1つのパーティションの処理から別のパーティションの処理に持ち越されないようにすることが重要です。

注釈

表形式の関数(UDTFs)には、入力引数に500個、出力列に500個の制限があります。

UDTFs のJavaクラス

UDTF の主要なコンポーネントは、ハンドラークラスと出力行クラスです。

ハンドラークラス

Snowflakeは、主にハンドラークラスの次のメソッドを呼び出すことによって UDTF と対話します。

  • 初期化子(コンストラクター)。

  • 行ごとの方法(process)。

  • ファイナライザーメソッド(endPartition)。

ハンドラークラスには、これら3つのメソッドをサポートするために必要な追加のメソッドを含めることができます。

ハンドラークラスには、後で説明するメソッド getOutputClass も含まれています。

ハンドラークラス(または 出力行クラス)の任意のメソッドから例外をスローすると、処理が停止します。UDTF を呼び出したクエリは、エラーメッセージを表示して失敗します。

コンストラクター

ハンドラークラスはコンストラクターを持つことができ、コンストラクターはゼロの引数を受ける必要があります。

コンストラクターは、 process の呼び出しの前に、 パーティション ごとに1回呼び出されます。

コンストラクターは出力行を生成できません。

コンストラクターを使用して、パーティションの状態を初期化します。この状態は、 process メソッドおよび endPartition メソッドで使用できます。コンストラクターは、行ごとに1回ではなく、パーティションごとに1回だけ実行する必要がある、長時間実行の初期化を配置するための適切な場所でもあります。

コンストラクターはオプションです。

process メソッド

process メソッドは、入力パーティションの行ごとに1回呼び出されます。

UDTF に渡された引数は、 process に渡されます。引数の値は、 SQL データ型からJavaデータ型に変換されます。(SQL データ型およびJavaデータ型のマッピングについては、 SQL-Javaデータ型マッピング をご参照ください。)

process メソッドのパラメーター名は、任意の有効なJava識別子にすることができます。名前は、 CREATE FUNCTION ステートメントで指定された名前と一致する必要はありません。

process が呼び出されるたびに、0、1、または複数の行を返すことができます。

process メソッドによって返されるデータ型は、 Stream<OutputRow> である必要があります。ここで、Streamはjava.util.stream.Streamで定義され、 OutputRow は出力行クラスの名前です。以下の例は、ストリームを介して入力を返すだけの単純な process メソッドを示しています。

import java.util.stream.Stream;

...

public Stream<OutputRow> process(String v) {
  return Stream.of(new OutputRow(v));
}

...
Copy

process メソッドがオブジェクトの状態を保持または使用しない場合(たとえば、メソッドが選択された入力行を出力から除外するように設計されている場合)、メソッド static を宣言できます。 process メソッドが static で、ハンドラークラスにコンストラクターまたは非静的 endPartition メソッドがない場合、Snowflakeは、ハンドラークラスのインスタンスを作成せずに、各行を静的 process メソッドに直接渡します。

入力行をスキップして次の行を処理する必要がある場合(たとえば、入力行を検証している場合)は、空の Stream オブジェクトを返します。たとえば、以下の process メソッドは、 number が正の整数である行のみを返します。 number が正ではない場合、メソッドは空の Stream オブジェクトを返し、現在の行をスキップして次の行の処理を続行します。

public Stream<OutputRow> process(int number) {
  if (inputNumber < 1) {
    return Stream.empty();
  }
  return Stream.of(new OutputRow(number));
}
Copy

process がnull Streamを返すと、処理は停止します。(null Streamが返された場合でも、 endPartition メソッドは引き続き呼び出されます。)

このメソッドは必須です。

endPartition メソッド

このオプションのメソッドを使用して、 process で集計された状態情報に基づく出力行を生成できます。このメソッドは、該当パーティションに含まれるすべての行が process に渡された後、各 パーティション ごとに1回呼び出されます。

このメソッドを含めると、データが明示的にパーティション分割されたか暗黙的にパーティション分割されたかに関係なく、各パーティションで呼び出されます。データが意味のある形で分割されていない場合、ファイナライザーの出力は意味のないものになる可能性があります。

注釈

ユーザーがデータを明示的にパーティションしない場合、Snowflakeはデータを暗黙的にパーティションします。詳細については、 パーティション をご参照ください。

このメソッドは、0行、1行、または複数行を出力できます。

注釈

Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。

getOutputClass メソッド

このメソッドは、 出力行クラス に関する情報を返します。出力行クラスには、返された行のデータ型に関する情報が含まれています。

出力行クラス

Snowflakeは、出力行クラスを使用して、Javaデータ型と SQL データ型の間の変換を指定します。

Java UDTF が行を返す場合、行の各列の値をJavaデータ型から対応する SQL データ型に変換する必要があります。SQL データ型は、 CREATE FUNCTION ステートメントの RETURNS 句で指定されます。ただし、Javaと SQL のデータ型間のマッピングは1対1ではないため、Snowflakeは返された各列のJavaデータ型を知る必要があります。(マッピング SQL およびJavaデータ型の詳細については、 SQL-Javaデータ型マッピング をご参照ください。)

Java UDTF は、出力行クラスを定義することにより、出力列のJavaデータ型を指定します。UDTF から返される各行は、出力行クラスのインスタンスとして返されます。出力行クラスの各インスタンスには、出力列ごとに1つのパブリックフィールドが含まれます。Snowflakeは、出力行クラスの各インスタンスからパブリックフィールドの値を読み取り、Java値を SQL 値に変換し、それらの値を含む SQL 出力行を作成します。

出力行クラスの各インスタンスの値は、出力行クラスのコンストラクターを呼び出すことによって設定されます。コンストラクターは、出力列に対応するパラメーターを受け入れ、パブリックフィールドをそれらのパラメーターに設定します。

以下のコードは、サンプルの出力行クラスを定義しています。

class OutputRow {

  public String name;
  public int id;

  public OutputRow(String pName, int pId) {
    this.name = pName;
    this.id = pId
  }

}
Copy

このクラスで指定されるパブリック変数は、 CREATE FUNCTION ステートメントの RETURNS TABLE (...) 句で指定された列と一致する必要があります。たとえば、上記の OutputRow クラスは、以下の RETURNS 句に対応します。

CREATE FUNCTION F(...)
    RETURNS TABLE(NAME VARCHAR, ID INTEGER)
    ...
Copy

重要

SQL 列名と出力行クラスのJavaパブリックフィールド名の一致では、 大文字と小文字を区別しません。たとえば、上記のJavaおよび SQL コードでは、 id という名前のJavaフィールドは ID という名前の SQL 列に対応します。

出力行クラスは次のように使用されます。

  • ハンドラークラスは、出力行クラスを使用して、 process メソッドと endPartition メソッドの戻り値の型を指定します。ハンドラークラスは、出力行クラスも使用して戻り値を作成します。例:

    public Stream<OutputRow> process(String v) {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    public Stream<OutputRow> endPartition() {
      ...
      return Stream.of(new OutputRow(...));
    }
    
    Copy
  • 出力行クラスは、ハンドラークラスの getOutputClass メソッドでも使用されます。これは、Snowflakeが出力のJavaデータ型を学習するために呼び出す静的メソッドです。

    public static Class getOutputClass() {
      return OutputRow.class;
    }
    
    Copy

出力行クラス(またはハンドラークラス)の任意のメソッドから例外をスローすると、処理が停止します。UDTF を呼び出したクエリは、エラーメッセージを表示して失敗します。

要件の概要

UDTF のJavaコードは、次の要件を満たしている必要があります。

  • コードは、 出力行クラス を定義する必要があります。

  • UDTF ハンドラークラスには、 <出力行クラス> のStream(Streamはjava.util.stream.Streamで定義)を返す、 process という名前のパブリックメソッドが含まれている必要があります。

  • UDTF ハンドラークラスは、 getOutputClass という名前のパブリック静的メソッドを定義する必要があります。このメソッドは、 <出力行クラス>.class を返す必要があります。

Javaコードがこれらの要件を満たしていない場合は、 UDTF の作成または実行に失敗します。

  • CREATE FUNCTION ステートメントの実行時にセッションにアクティブなウェアハウスがある場合、Snowflakeは関数の作成時に違反を検出します。

  • CREATE FUNCTION ステートメントの実行時にセッションにアクティブなウェアハウスがない場合、Snowflakeは関数が呼び出されたときに違反を検出します。

クエリでJava UDTFs を呼び出す例

UDFs と UDTFs の呼び出しに関する一般的な情報については、 UDF の呼び出し をご参照ください。

明示的なパーティション分割を使用しない呼び出し

次の例は、 UDTF の作成方法を示しています。この例では、各入力の2つのコピーを返し、パーティションごとに1つの追加行を返します。

create function return_two_copies(v varchar)
returns table(output_value varchar)
language java
handler='TestFunction'
target_path='@~/TestFunction.jar'
as
$$

  import java.util.stream.Stream;

  class OutputRow {

    public String output_value;

    public OutputRow(String outputValue) {
      this.output_value = outputValue;
    }

  }


  class TestFunction {

    String myString;

    public TestFunction()  {
      myString = "Created in constructor and output from endPartition()";
    }

    public static Class getOutputClass() {
      return OutputRow.class;
    }

    public Stream<OutputRow> process(String inputValue) {
      // Return two rows with the same value.
      return Stream.of(new OutputRow(inputValue), new OutputRow(inputValue));
    }

    public Stream<OutputRow> endPartition() {
      // Returns the value we initialized in the constructor.
      return Stream.of(new OutputRow(myString));
    }

  }

$$;
Copy

この例は、 UDTF を呼び出す方法を示しています。この例を単純にするために、ステートメントは列ではなくリテラル値を渡し、 OVER() 句を省略します。

SELECT output_value
   FROM TABLE(return_two_copies('Input string'));
+-------------------------------------------------------+
| OUTPUT_VALUE                                          |
|-------------------------------------------------------|
| Input string                                          |
| Input string                                          |
| Created in constructor and output from endPartition() |
+-------------------------------------------------------+
Copy

この例では、別のテーブルから読み取った値を使用して UDTF を呼び出します。 process メソッドが呼び出されるたびに、 cities_of_interest テーブルにある現在の行の city_name 列から値が渡されます。上記のように、 UDTF は明示的な OVER() 句なしで呼び出されます。

入力のソースとして使用する簡単なテーブルを作成します。

CREATE TABLE cities_of_interest (city_name VARCHAR);
INSERT INTO cities_of_interest (city_name) VALUES
    ('Toronto'),
    ('Warsaw'),
    ('Kyoto');
Copy

Java UDTF を呼び出します。

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

注意

この例では、 FROM 句で使用される構文は、内部結合(FROM t1, t2)の構文と同じです。ただし、実行される操作は真の内部結合では ありません。実際の動作では、テーブルにある各行の値で関数が呼び出されます。言い換えると、次の FROM 句が与えられます。

from cities_of_interest, table(f(city_name))
Copy

この動作は、次の擬似コードと同等です。

for city_name in cities_of_interest:
    output_row = f(city_name)
Copy

JavaScript UDTFs ドキュメントにある例セクション には、テーブルからの値を使用して UDTFs を呼び出すクエリのより複雑な例が含まれています。

ステートメントがパーティション分割を明示的に指定しない場合、Snowflake実行エンジンは 暗黙的なパーティション分割 を使用します。

パーティションが1つしかない場合、 endPartition メソッドは1回だけ呼び出され、クエリの出力には、値 Created in constructor and output from endPartition() を含む1つの行のみが含まれます。ステートメントのさまざまな実行の間に、異なる数のパーティションにデータがグループ化されている場合、 endPartition メソッドは、異なる回数で呼び出され、出力にはこの行の異なる数のコピーが含まれます。

詳細については、 暗黙的なパーティション分割 をご参照ください。

明示的なパーティション分割を使用した呼び出し

Java UDTFs は、明示的なパーティション分割を使用して呼び出すこともできます。

複数のパーティション

次の例では、前に作成したものと同じ UDTF とテーブルを使用しています。この例では、city_nameでデータを分割します。

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY city_name))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Created in constructor and output from endPartition() |
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Created in constructor and output from endPartition() |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Created in constructor and output from endPartition() |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
+-----------+-------------------------------------------------------+
Copy

単一のパーティション

次の例では、前に作成した同じ UDTF とテーブルを使用し、データを定数でパーティション化します。これにより、Snowflakeは単一のパーティションのみを使用するようになります。

SELECT city_name, output_value
   FROM cities_of_interest,
       TABLE(return_two_copies(city_name) OVER (PARTITION BY 1))
   ORDER BY city_name, output_value;
+-----------+-------------------------------------------------------+
| CITY_NAME | OUTPUT_VALUE                                          |
|-----------+-------------------------------------------------------|
| Kyoto     | Kyoto                                                 |
| Kyoto     | Kyoto                                                 |
| Toronto   | Toronto                                               |
| Toronto   | Toronto                                               |
| Warsaw    | Warsaw                                                |
| Warsaw    | Warsaw                                                |
| NULL      | Created in constructor and output from endPartition() |
+-----------+-------------------------------------------------------+
Copy

メッセージ Created in constructor and output from endPartition() のコピーが1つだけ出力に含まれていることに注意してください。これは、 endPartition が1回だけ呼び出されたことを示しています。

非常に大きな入力(例: 大きなファイル)の処理

場合によっては、 UDTF は各入力行を処理するため、非常に大量のメモリを必要とします。たとえば、 UDTF は、メモリに収まらないほど大きいファイルを読み取って処理する場合があります。

UDF または UDTF の大きなファイルを処理するには、 SnowflakeFile または InputStream クラスを使用します。詳細については、 UDF およびプロシージャハンドラーを使用した非構造化データの処理 をご参照ください。