Javaでの DataFrames 用ユーザー定義関数(UDFs)の作成

Snowpark API は、Javaにあるラムダ式からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。

このトピックの内容:

紹介

Snowpark APIs を呼び出して、Javaにあるラムダ式に対するユーザー定義関数(UDFs)を作成し、これらの UDFs を呼び出して DataFrame にあるデータを処理できます。

Snowpark API を使用して UDF を作成すると、Snowparkライブラリは UDF のコードをシリアル化してステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。

カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。

次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。

  • 匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • 名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

このトピックの残りの部分では、 UDFs を作成する方法について説明します。

注釈

CREATE FUNCTION コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。

詳細については、 スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。

引数と戻り値でサポートされるデータ型

Javaラムダの UDF を作成するには、メソッドの引数と戻り値として以下にリストした、サポートされているデータ型を使用する必要があります。

SQL データ型

Javaデータ型

メモ

NUMBER

次の型がサポートされています。

  • Integer

  • Long

  • java.math.BigDecimal または java.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] または Variant[]

OBJECT

Map<文字列、文字列> または Map<文字列、変数>

GEOGRAPHY

com.snowflake.snowpark_java.types.Geography

UDF の依存関係の指定

Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency() を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)

Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。

Tip

アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency を呼び出すときは、ステージ内のファイルへのパスを渡します。

次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

次の依存関係は、指定する必要がありません。

  • Javaランタイムライブラリ。

    これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。

  • Snowpark JAR ファイル。

    Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。

    ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、

    1. Snowpark JAR ファイルをステージにアップロードします。

      たとえば、次のコマンドはSnowpark JAR ファイルをステージ @mystage にアップロードします。PUT コマンドは、 JAR ファイルを圧縮し、結果のファイルにsnowpark-1.10.0.jar.gzという名前を付けます。

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.10.0.jar @mystage
    2. addDependency を呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。

      たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.10.0.jar.gz");

      JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された .gz ファイル名拡張子が含まれていることに注意してください。

  • 現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。

    Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。

    Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、 addDependency を呼び出す必要があります。

依存関係がステージにアップロードされるのに時間がかかりすぎる場合、Snowparkライブラリはタイムアウト例外を報告します。Snowparkライブラリが待機する時間を最大に設定するには、セッションの作成時に Snowparkがリクエストするタイムアウト(秒単位) プロパティを設定します。

匿名 UDF の作成

匿名 UDF を作成するには、次のいずれかを実行できます。

  • Functions.udf 静的メソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。

  • UDFRegistration クラスの registerTemporary メソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。

    Session オブジェクトの udf メソッドを呼び出すと、 UDFRegistration クラスのインスタンスにアクセスできます。

    registerTemporary を呼び出すときは、 name パラメーターを持たないメソッド署名を使用します。(匿名の UDF を作成しているため、 UDF の名前は指定しません。)

注釈

マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf メソッドを使用するのではなく、 registerTemporary メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。

これらのメソッドは UserDefinedFunction オブジェクトを返し、これを使用して UDF を呼び出すことができます。(スカラーユーザー定義関数(UDFs)の呼び出し を参照。)

次の例では、匿名の UDF を作成します。

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data 列の言語を検出し、使用されている言語で追加の lang 列を含む新しい DataFrame を作成します。

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

名前付き UDF の作成と登録

UDF を名前で呼び出す場合(例: Functions.callUDF 静的メソッドを使用して)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration クラスで次のいずれかのメソッドを使用します。

  • 現在のセッションのみで UDF を使用する予定の場合は、 registerTemporary

  • 後続のセッションで UDF を使用する予定の場合は、 registerPermanent

UDFRegistration クラスのオブジェクトにアクセスするには、 Session オブジェクトの udf メソッドを呼び出します。

registerTemporary または registerPermanent メソッドを呼び出すときは、ラムダ式と、入力および出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。

例:

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。

注釈

registerPermanent は、外部ステージをサポートしていません。

例:

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

シリアル化できないオブジェクトの使用

ラムダ式の UDF を作成すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。

ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合、Snowparkライブラリは java.io.NotSerializableException 例外をスローします。

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

これが発生した場合は、オブジェクトをシリアル化できるようにする必要があります。

UDF の初期化コードの記述

UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。

次の例では、別のクラスを使用して、2つの UDFs に必要なコンテキストを初期化します。

  • 最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。

  • 2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

UDF からのファイルの読み取り

前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスを改善できます。

ファイルを読み取るように UDF を設定するには、

  1. 該当ファイルを JAR ファイルに追加します。

    たとえば、 UDF が data/ サブディレクトリ(data/hello.txt)内のファイルを使用する必要がある場合は、 jar コマンドを実行してこのファイルを JAR ファイルに追加します。

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。

    例:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. UDF で、 Class.forName().getResourceAsStream() を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。

    this への依存関係の追加を回避するために、 Class.forName("com.snowflake.snowpark_java.DataFrame") を(getClass() の代わりに)使用して、 Class オブジェクトを取得できます。

    たとえば、 data/hello.txt ファイルを読み取るには、

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    この例では、リソース名は / で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)

注釈

UDF 呼び出し間でファイルの内容が変更されることを予期しない場合は、クラスの静的フィールドにファイルを読み取り、フィールドが設定されていない場合にのみファイルを読み取ります。

次の例では、 UDF (readFileFunc)として使用される関数を使用してオブジェクト(UDFCode)を定義します。この関数は、文字列 hello, を含むことが期待されるファイル data/hello.txt を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME 列で UDF を呼び出します。この例では、 data/hello.txt ファイルが JAR ファイル myJar.jar にパッケージ化されていることを前提としています。

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

ユーザー定義のテーブル関数(UDTFs)の作成

Snowparkで UDTF を作成して登録するには、次が必要です。

次のセクションでは、これらのステップについて詳しく説明します。

UDTF の呼び出しについては、 UDTF の呼び出し をご参照ください。

UDTF クラスの定義

com.snowflake.snowpark_java.udtfパッケージJavaUDTFn インターフェイス(例: JavaUDTF0JavaUDTF1 など)の1つを実装するクラスを定義します。ここで、 n は、 UDTF の入力引数の数を指定します。たとえば、 UDTF が2つの入力引数を渡す場合は、 JavaUDTF2 インターフェイスを実装します。

クラスで、次のメソッドを実装します。

  • outputSchema()。これは、返される行(出力の「スキーマ」)のフィールドの名前とタイプを説明する types.StructType オブジェクトを返します。

  • process()。これは、 入力パーティション の各行に対して1回呼び出されます(以下の注を参照)。

  • inputSchema()。入力パラメーターの型を記述する types.StructType オブジェクトを返します。

    process() メソッドが Map 引数を渡す場合は、 inputSchema() メソッドを実装する必要があります。それ以外の場合、このメソッドの実装はオプションです。

  • endPartition()。これは、すべての行が process() に渡された後、パーティションごとに1回呼び出されます。

UDTF が呼び出されると、行は UDTF に渡される前にパーティションにグループ化されます。

  • UDTF を呼び出すステートメントが PARTITION 句(明示的なパーティション)を指定している場合、その句は行のパーティション化の方法を決定します。

  • ステートメントで PARTITION 句が指定されていない場合(暗黙的なパーティション)、Snowflakeは行をパーティション化する最適な方法を決定します。

パーティションの説明については、 テーブル関数とパーティション をご参照ください。

UDTF クラスの例については、 UDTF クラスの例 をご参照ください。

outputSchema() メソッドの実装

outputSchema() メソッドを実装して、 process() メソッドと endPartition() メソッドによって返される行のフィールド(「出力スキーマ」)の名前とデータ型を定義します。

public StructType outputSchema()
Copy

このメソッドでは、返される行の各フィールドのSnowflakeデータ型を表す StructField オブジェクトを含んでいる、 StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の出力スキーマに対して次の型オブジェクトをサポートしています。

SQL データ型

Java型

com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<文字列、文字列>

MapType(StringType, StringType)

OBJECT

java.util.Map<文字列、バリアント>

MapType(StringType, VariantType)

たとえば、 UDTF が単一の整数フィールドを持つ行を返す場合、

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

process() メソッドの実装

UDTF クラスで、 process() メソッドを実装します。

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

ここで、 n は、 UDTF に渡される引数の数です。

署名内の引数の数は、実装したインターフェイスに対応しています。たとえば、 UDTF が2つの入力引数を渡し、 JavaUDTF2 インターフェイスを実装している場合、 process() メソッドには次の署名があります。

Stream<Row> process(A0 arg0, A1 arg1)
Copy

このメソッドは、入力パーティションの行ごとに1回呼び出されます。

引数の型の選択

process() メソッドの各引数の型には、 UDTF に渡される引数のSnowflakeデータ型に対応するJava型を使用します。

Snowflakeは、 UDTF の引数に対して次のデータ型をサポートしています。

SQL データ型

Javaデータ型

メモ

NUMBER

次の型がサポートされています。

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] または Variant[]

OBJECT

Map<文字列、文字列> または Map<文字列、変数>

注釈

java.util.Map 引数を渡す場合は、 inputSchema メソッドを実装して、それらの引数の型を記述する必要があります。 inputSchema() メソッドの実装 をご参照ください。

行を返す

process() メソッドで、指定された入力値に対して、 UDTF により返されるデータを含んだ Row オブジェクトの java.util.stream.Stream を構築して返します。行のフィールドは、 outputSchema メソッドで指定した型を使用する必要があります。(outputSchema() メソッドの実装 を参照。)

たとえば、 UDTF が行を生成する場合は、生成された行に対する Row オブジェクトの Iterable を作成して返します。

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

inputSchema() メソッドの実装

process() メソッドが java.util.Map 引数を渡す場合は、入力引数の型を記述するために inputSchema() メソッドを実装する必要があります。

注釈

process() メソッドが Map 引数を渡さない場合は、 inputSchema() メソッドを実装する必要はありません。

このメソッドでは、 process() メソッドに渡された各引数のSnowflakeデータ型を表す StructField オブジェクトを含んだ StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の入力スキーマに対して次の型オブジェクトをサポートしています。

SQL データ型

Java型

com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<文字列、文字列>

MapType(StringType, StringType)

OBJECT

java.util.Map<文字列、バリアント>

MapType(StringType, VariantType)

たとえば、 process() メソッドが Map<文字列、文字列> 引数と Map<文字列、バリアント> 引数を渡すとします。

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

これらの入力引数の型を記述する StructType オブジェクトを返すには、 inputSchema() メソッドを実装する必要があります。

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

endPartition() メソッドの実装

endPartition メソッドを実装し、入力パーティションのすべての行が process メソッドに渡された後に実行する必要があるコードを追加します。 endPartition メソッドは、入力パーティションごとに1回呼び出されます。

public Stream<Row> endPartition()
Copy

パーティション内のすべての行が処理された後に作業を実行する必要がある場合は、このメソッドを使用できます。たとえば、次が可能です。

  • process メソッド呼び出しでキャプチャした状態情報に基づいて行を返します。

  • 特定の入力行に関連付けられていない行を返します。

  • process メソッドによって生成された出力行を要約した行を返します。

返す行のフィールドは、 outputSchema メソッドで指定した型と一致する必要があります。(outputSchema() メソッドの実装 を参照。)

各パーティションの最後に追加の行を返す必要がない場合は、空の Stream を返します。例:

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

注釈

Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。

UDTF クラスの例

以下は、行の範囲を生成する UDTF クラスの例です。

  • UDTF は2つの引数を渡すため、クラスは JavaUDTF2 を実装します。

  • 引数 startcount は、行の開始番号と生成する行数を指定します。

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

UDTF の登録

次に、新しいクラスのインスタンスを作成し、 UDTFRegistration メソッドの1つを呼び出してクラスを登録します。 または 永続的 UDTF を登録できます。

仮 UDTF の登録

仮 UDTF を登録するには、 UDTFRegistration.registerTemporary を呼び出します。

  • 名前で UDTF を呼び出す必要がない場合は、クラスのインスタンスを渡すことで匿名の UDTF を登録できます。

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • UDTF を名前で呼び出す必要がある場合は、 UDTF の名前も渡します。

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

永続的 UDTF の登録

後続のセッションで UDTF を使用する必要がある場合は、 UDTFRegistration.registerPermanent を呼び出して永続的 UDTF を登録します。

永続的 UDTF を登録するときは、登録メソッドが UDTF とその依存関係の JAR ファイルをアップロードするステージを指定する必要があります。例:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

UDTF の呼び出し

UDTF を登録した後、返された TableFunction オブジェクトを Session オブジェクトの tableFunction メソッドに渡すと、 UDTF を呼び出すことができます。

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

名前で UDTF を呼び出すには、その名前で TableFunction オブジェクトを作成し、それを tableFunction メソッドに渡します。

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

SELECT ステートメントを介して UDTF を直接呼び出すこともできます。

session.sql("select * from table(myUdtf(10, 5))");
Copy