Javaでの DataFrames 用ユーザー定義関数(UDFs)の作成¶
Snowpark API は、Javaにあるラムダ式からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。
このトピックの内容:
紹介¶
Snowpark APIs を呼び出して、Javaにあるラムダ式に対するユーザー定義関数(UDFs)を作成し、これらの UDFs を呼び出して DataFrame にあるデータを処理できます。
Snowpark API を使用して UDF を作成すると、Snowparkライブラリは UDF のコードをシリアル化してステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。
カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。
次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。
匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。
import com.snowflake.snowpark_java.types.*; ... // Create and register an anonymous UDF (doubleUdf) // that takes in an integer argument and returns an integer value. UserDefinedFunction doubleUdf = Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType); // Call the anonymous UDF. DataFrame df = session.table("sample_product_data"); DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity"))); dfWithDoubleQuantity.show();
名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。
import com.snowflake.snowpark_java.types.*; ... // Create and register a permanent named UDF ("doubleUdf") // that takes in an integer argument and returns an integer value. UserDefinedFunction doubleUdf = session .udf() .registerPermanent( "doubleUdf", (Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType, "mystage"); // Call the named UDF. DataFrame df = session.table("sample_product_data"); DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity"))); dfWithDoubleQuantity.show();
このトピックの残りの部分では、 UDFs を作成する方法について説明します。
注釈
CREATE FUNCTION
コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。
詳細については、 スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。
引数と戻り値でサポートされるデータ型¶
Javaラムダの UDF を作成するには、メソッドの引数と戻り値として以下にリストした、サポートされているデータ型を使用する必要があります。
SQL データ型 |
Javaデータ型 |
メモ |
---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
UDF の依存関係の指定¶
Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency()
を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)
Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。
Tip
アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency
を呼び出すときは、ステージ内のファイルへのパスを渡します。
次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");
// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
次の依存関係は、指定する必要がありません。
Javaランタイムライブラリ。
これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。
Snowpark JAR ファイル。
Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。
ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、
Snowpark JAR ファイルをステージにアップロードします。
たとえば、次のコマンドはSnowpark JAR ファイルをステージ
@mystage
にアップロードします。PUT コマンドは、 JAR ファイルを圧縮し、結果のファイルにsnowpark-1.10.0.jar.gzという名前を付けます。-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.10.0.jar @mystage
addDependency
を呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.10.0.jar.gz");JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された
.gz
ファイル名拡張子が含まれていることに注意してください。現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。
Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。
Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、
addDependency
を呼び出す必要があります。
依存関係がステージにアップロードされるのに時間がかかりすぎる場合、Snowparkライブラリはタイムアウト例外を報告します。Snowparkライブラリが待機する時間を最大に設定するには、セッションの作成時に Snowparkがリクエストするタイムアウト(秒単位) プロパティを設定します。
匿名 UDF の作成¶
匿名 UDF を作成するには、次のいずれかを実行できます。
Functions.udf
静的メソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。UDFRegistration
クラスのregisterTemporary
メソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。Session
オブジェクトのudf
メソッドを呼び出すと、UDFRegistration
クラスのインスタンスにアクセスできます。registerTemporary
を呼び出すときは、name
パラメーターを持たないメソッド署名を使用します。(匿名の UDF を作成しているため、 UDF の名前は指定しません。)
注釈
マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf
メソッドを使用するのではなく、 registerTemporary
メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session
オブジェクトが見つからないエラーを防ぐことができます。
これらのメソッドは UserDefinedFunction
オブジェクトを返し、これを使用して UDF を呼び出すことができます。(スカラーユーザー定義関数(UDFs)の呼び出し を参照。)
次の例では、匿名の UDF を作成します。
import com.snowflake.snowpark_java.types.*;
...
// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector
)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data
列の言語を検出し、使用されている言語で追加の lang
列を含む新しい DataFrame を作成します。
import com.snowflake.snowpark_java.types.*;
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");
// Create a detector
LanguageDetector detector = new LanguageDetector();
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
Functions.udf(
(String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
DataTypes.StringType,
DataTypes.StringType);
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
名前付き UDF の作成と登録¶
UDF を名前で呼び出す場合(例: Functions.callUDF
静的メソッドを使用して)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration
クラスで次のいずれかのメソッドを使用します。
現在のセッションのみで UDF を使用する予定の場合は、
registerTemporary
後続のセッションで UDF を使用する予定の場合は、
registerPermanent
UDFRegistration
クラスのオブジェクトにアクセスするには、 Session
オブジェクトの udf
メソッドを呼び出します。
registerTemporary
または registerPermanent
メソッドを呼び出すときは、ラムダ式と、入力および出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。
例:
import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
session
.udf()
.registerTemporary(
"doubleUdf",
(Integer x) -> x + x,
DataTypes.IntegerType,
DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
registerPermanent
は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent
を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。
注釈
registerPermanent
は、外部ステージをサポートしていません。
例:
import com.snowflake.snowpark_java.types.*;
...
// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
session
.udf()
.registerPermanent(
"doubleUdf",
(Integer x) -> x + x,
DataTypes.IntegerType,
DataTypes.IntegerType,
"mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
シリアル化できないオブジェクトの使用¶
ラムダ式の UDF を作成すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。
ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合、Snowparkライブラリは java.io.NotSerializableException
例外をスローします。
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
これが発生した場合は、オブジェクトをシリアル化できるようにする必要があります。
UDF の初期化コードの記述¶
UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。
次の例では、別のクラスを使用して、2つの UDFs に必要なコンテキストを初期化します。
最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。
2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;
// Context needed for a UDF.
class Context {
double randomInt = Math.random();
}
// Serializable context needed for the UDF.
class SerContext implements Serializable {
double randomInt = Math.random();
}
class TestUdf {
public static void main(String[] args) {
// Create the session.
Session session = Session.builder().configFile("/<path>/profile.properties").create();
session.range(1, 10, 2).show();
// Create a DataFrame with two columns ("c" and "d").
DataFrame dummy =
session.createDataFrame(
new Row[]{
Row.create(1, 1),
Row.create(2, 2),
Row.create(3, 3)
},
StructType.create(
new StructField("c", DataTypes.IntegerType),
new StructField("d", DataTypes.IntegerType))
);
dummy.show();
// Initialize the context once per invocation.
UserDefinedFunction udfRepeatedInit =
Functions.udf(
(Integer i) -> new Context().randomInt,
DataTypes.IntegerType,
DataTypes.DoubleType
);
dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
SerContext sC = new SerContext();
UserDefinedFunction udfOnceInit =
Functions.udf(
(Integer i) -> sC.randomInt,
DataTypes.IntegerType,
DataTypes.DoubleType
);
dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
}
}
UDF からのファイルの読み取り¶
前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。
さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスを改善できます。
ファイルを読み取るように UDF を設定するには、
該当ファイルを JAR ファイルに追加します。
たとえば、 UDF が
data/
サブディレクトリ(data/hello.txt
)内のファイルを使用する必要がある場合は、jar
コマンドを実行してこのファイルを JAR ファイルに追加します。# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。
例:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar");
UDF で、
Class.forName().getResourceAsStream()
を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。this
への依存関係の追加を回避するために、Class.forName("com.snowflake.snowpark_java.DataFrame")
を(getClass()
の代わりに)使用して、Class
オブジェクトを取得できます。たとえば、
data/hello.txt
ファイルを読み取るには、// Read data/hello.txt from myJar.jar. String resourceName = "/data/hello.txt"; InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
この例では、リソース名は
/
で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)
注釈
UDF 呼び出し間でファイルの内容が変更されることを予期しない場合は、クラスの静的フィールドにファイルを読み取り、フィールドが設定されていない場合にのみファイルを読み取ります。
次の例では、 UDF (readFileFunc
)として使用される関数を使用してオブジェクト(UDFCode
)を定義します。この関数は、文字列 hello,
を含むことが期待されるファイル data/hello.txt
を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
// Create a function class that reads a file.
class UDFCode {
private static String fileContent = null;
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// The file content is cached in 'fileContent'.
public static String readFile() {
if (fileContent == null) {
try {
String resourceName = "/data/hello.txt";
InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
.getResourceAsStream(resourceName);
fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception e) {
fileContent = "Error while reading file";
}
}
return fileContent;
}
}
例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME
列で UDF を呼び出します。この例では、 data/hello.txt
ファイルが JAR ファイル myJar.jar
にパッケージ化されていることを前提としています。
import com.snowflake.snowpark_java.types.*;
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");
// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
(String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
ユーザー定義のテーブル関数(UDTFs)の作成¶
Snowparkで UDTF を作成して登録するには、次が必要です。
次のセクションでは、これらのステップについて詳しく説明します。
UDTF の呼び出しについては、 UDTF の呼び出し をご参照ください。
UDTF クラスの定義¶
com.snowflake.snowpark_java.udtfパッケージ の JavaUDTFn
インターフェイス(例: JavaUDTF0
、 JavaUDTF1
など)の1つを実装するクラスを定義します。ここで、 n
は、 UDTF の入力引数の数を指定します。たとえば、 UDTF が2つの入力引数を渡す場合は、 JavaUDTF2
インターフェイスを実装します。
クラスで、次のメソッドを実装します。
outputSchema()。これは、返される行(出力の「スキーマ」)のフィールドの名前とタイプを説明する
types.StructType
オブジェクトを返します。process()。これは、 入力パーティション の各行に対して1回呼び出されます(以下の注を参照)。
inputSchema()。入力パラメーターの型を記述する
types.StructType
オブジェクトを返します。process()
メソッドがMap
引数を渡す場合は、inputSchema()
メソッドを実装する必要があります。それ以外の場合、このメソッドの実装はオプションです。endPartition()。これは、すべての行が
process()
に渡された後、パーティションごとに1回呼び出されます。
UDTF が呼び出されると、行は UDTF に渡される前にパーティションにグループ化されます。
UDTF を呼び出すステートメントが PARTITION 句(明示的なパーティション)を指定している場合、その句は行のパーティション化の方法を決定します。
ステートメントで PARTITION 句が指定されていない場合(暗黙的なパーティション)、Snowflakeは行をパーティション化する最適な方法を決定します。
パーティションの説明については、 テーブル関数とパーティション をご参照ください。
UDTF クラスの例については、 UDTF クラスの例 をご参照ください。
outputSchema() メソッドの実装¶
outputSchema()
メソッドを実装して、 process()
メソッドと endPartition()
メソッドによって返される行のフィールド(「出力スキーマ」)の名前とデータ型を定義します。
public StructType outputSchema()
このメソッドでは、返される行の各フィールドのSnowflakeデータ型を表す StructField オブジェクトを含んでいる、 StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の出力スキーマに対して次の型オブジェクトをサポートしています。
SQL データ型 |
Java型 |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
たとえば、 UDTF が単一の整数フィールドを持つ行を返す場合、
public StructType outputSchema() { return StructType.create(new StructField("C1", DataTypes.IntegerType)); }
process() メソッドの実装¶
UDTF クラスで、 process()
メソッドを実装します。
Stream<Row> process(A0 arg0, ... A<n> arg<n>)
ここで、 n
は、 UDTF に渡される引数の数です。
署名内の引数の数は、実装したインターフェイスに対応しています。たとえば、 UDTF が2つの入力引数を渡し、 JavaUDTF2
インターフェイスを実装している場合、 process()
メソッドには次の署名があります。
Stream<Row> process(A0 arg0, A1 arg1)
このメソッドは、入力パーティションの行ごとに1回呼び出されます。
引数の型の選択¶
process()
メソッドの各引数の型には、 UDTF に渡される引数のSnowflakeデータ型に対応するJava型を使用します。
Snowflakeは、 UDTF の引数に対して次のデータ型をサポートしています。
SQL データ型 |
Javaデータ型 |
メモ |
---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
注釈
java.util.Map
引数を渡す場合は、 inputSchema
メソッドを実装して、それらの引数の型を記述する必要があります。 inputSchema() メソッドの実装 をご参照ください。
行を返す¶
process()
メソッドで、指定された入力値に対して、 UDTF により返されるデータを含んだ Row
オブジェクトの java.util.stream.Stream を構築して返します。行のフィールドは、 outputSchema
メソッドで指定した型を使用する必要があります。(outputSchema() メソッドの実装 を参照。)
たとえば、 UDTF が行を生成する場合は、生成された行に対する Row
オブジェクトの Iterable
を作成して返します。
import java.util.stream.Stream; ... public Stream<Row> process(Integer start, Integer count) { Stream.Builder<Row> builder = Stream.builder(); for (int i = start; i < start + count ; i++) { builder.add(Row.create(i)); } return builder.build(); }
inputSchema() メソッドの実装¶
process() メソッドが java.util.Map
引数を渡す場合は、入力引数の型を記述するために inputSchema()
メソッドを実装する必要があります。
注釈
process()
メソッドが Map
引数を渡さない場合は、 inputSchema()
メソッドを実装する必要はありません。
このメソッドでは、 process()
メソッドに渡された各引数のSnowflakeデータ型を表す StructField オブジェクトを含んだ StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の入力スキーマに対して次の型オブジェクトをサポートしています。
SQL データ型 |
Java型 |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
たとえば、 process()
メソッドが Map<文字列、文字列>
引数と Map<文字列、バリアント>
引数を渡すとします。
import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...
public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
...
}
これらの入力引数の型を記述する StructType
オブジェクトを返すには、 inputSchema()
メソッドを実装する必要があります。
import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...
public StructType inputSchema() {
return StructType.create(
new StructField(
"string_map",
DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
new StructField(
"variant_map",
DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
endPartition() メソッドの実装¶
endPartition
メソッドを実装し、入力パーティションのすべての行が process
メソッドに渡された後に実行する必要があるコードを追加します。 endPartition
メソッドは、入力パーティションごとに1回呼び出されます。
public Stream<Row> endPartition()
パーティション内のすべての行が処理された後に作業を実行する必要がある場合は、このメソッドを使用できます。たとえば、次が可能です。
各
process
メソッド呼び出しでキャプチャした状態情報に基づいて行を返します。特定の入力行に関連付けられていない行を返します。
process
メソッドによって生成された出力行を要約した行を返します。
返す行のフィールドは、 outputSchema
メソッドで指定した型と一致する必要があります。(outputSchema() メソッドの実装 を参照。)
各パーティションの最後に追加の行を返す必要がない場合は、空の Stream
を返します。例:
public Stream<Row> endPartition() { return Stream.empty(); }
注釈
Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition
の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。
UDTF クラスの例¶
以下は、行の範囲を生成する UDTF クラスの例です。
UDTF は2つの引数を渡すため、クラスは
JavaUDTF2
を実装します。引数
start
とcount
は、行の開始番号と生成する行数を指定します。
import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;
class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
public StructType outputSchema() {
return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
// Because the process() method in this example does not pass in Map arguments,
// implementing the inputSchema() method is optional.
public StructType inputSchema() {
return StructType.create(
new StructField("start_value", DataTypes.IntegerType),
new StructField("value_count", DataTypes.IntegerType));
}
public Stream<Row> endPartition() {
return Stream.empty();
}
public Stream<Row> process(Integer start, Integer count) {
Stream.Builder<Row> builder = Stream.builder();
for (int i = start; i < start + count ; i++) {
builder.add(Row.create(i));
}
return builder.build();
}
}
UDTF の登録¶
次に、新しいクラスのインスタンスを作成し、 UDTFRegistration メソッドの1つを呼び出してクラスを登録します。 仮 または 永続的 UDTF を登録できます。
仮 UDTF の登録¶
仮 UDTF を登録するには、 UDTFRegistration.registerTemporary
を呼び出します。
名前で UDTF を呼び出す必要がない場合は、クラスのインスタンスを渡すことで匿名の UDTF を登録できます。
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf()); // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
UDTF を名前で呼び出す必要がある場合は、 UDTF の名前も渡します。
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf()); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
永続的 UDTF の登録¶
後続のセッションで UDTF を使用する必要がある場合は、 UDTFRegistration.registerPermanent
を呼び出して永続的 UDTF を登録します。
永続的 UDTF を登録するときは、登録メソッドが UDTF とその依存関係の JAR ファイルをアップロードするステージを指定する必要があります。例:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage"); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
UDTF の呼び出し¶
UDTF を登録した後、返された TableFunction
オブジェクトを Session
オブジェクトの tableFunction
メソッドに渡すと、 UDTF を呼び出すことができます。
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf()); // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
名前で UDTF を呼び出すには、その名前で TableFunction
オブジェクトを作成し、それを tableFunction
メソッドに渡します。
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf()); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
SELECT ステートメントを介して UDTF を直接呼び出すこともできます。
session.sql("select * from table(myUdtf(10, 5))");