Javaでの DataFrames 用ユーザー定義関数(UDFs)の作成¶
Snowpark API は、Javaにあるラムダ式からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。
概要¶
Snowpark APIs を呼び出して、Javaにあるラムダ式に対するユーザー定義関数(UDFs)を作成し、これらの UDFs を呼び出して DataFrame にあるデータを処理できます。
Snowpark API を使用して UDF を作成すると、Snowparkライブラリは UDF のコードをシリアル化してステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。
カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。
次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。
匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。
名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。
このトピックの残りの部分では、 UDFs を作成する方法について説明します。
注釈
CREATE FUNCTION コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。
詳細については、 スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。
引数と戻り値でサポートされるデータ型¶
Javaラムダの UDF を作成するには、メソッドの引数と戻り値として以下にリストした、サポートされているデータ型を使用する必要があります。
SQL データ型 |
Javaデータ型 |
メモ |
|---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
UDF の依存関係の指定¶
Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency() を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)
Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。
Tip
アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency を呼び出すときは、ステージ内のファイルへのパスを渡します。
次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。
次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。
次の依存関係は、指定する必要がありません。
Javaランタイムライブラリ。
これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。
Snowpark JAR ファイル。
Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。
ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、
Snowpark JAR ファイルをステージにアップロードします。
For example, the following command uploads the Snowpark JAR file to the stage
@mystage. The PUT command compresses the JAR file and names the resulting file snowpark_2.12-1.18.0.jar.gz.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark_2.12-1.18.0.jar @mystage
addDependencyを呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark_2.12-1.18.0.jar.gz");JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された
.gzファイル名拡張子が含まれていることに注意してください。現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。
Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。
Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、
addDependencyを呼び出す必要があります。
依存関係がステージにアップロードされるのに時間がかかりすぎる場合、Snowparkライブラリはタイムアウト例外を報告します。Snowparkライブラリが待機する時間を最大に設定するには、セッションの作成時に Snowparkがリクエストするタイムアウト(秒単位) プロパティを設定します。
匿名 UDF の作成¶
匿名 UDF を作成するには、次のいずれかを実行できます。
Functions.udf静的メソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。UDFRegistrationクラスのregisterTemporaryメソッドを呼び出し、ラムダ式と、入力と出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。Sessionオブジェクトのudfメソッドを呼び出すと、UDFRegistrationクラスのインスタンスにアクセスできます。registerTemporaryを呼び出すときは、nameパラメーターを持たないメソッド署名を使用します。(匿名の UDF を作成しているため、 UDF の名前は指定しません。)
注釈
マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf メソッドを使用するのではなく、 registerTemporary メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。
これらのメソッドは UserDefinedFunction オブジェクトを返し、これを使用して UDF を呼び出すことができます。(スカラーユーザー定義関数(UDFs)の呼び出し を参照。)
次の例では、匿名の UDF を作成します。
次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data 列の言語を検出し、使用されている言語で追加の lang 列を含む新しい DataFrame を作成します。
名前付き UDF の作成と登録¶
UDF を名前で呼び出す場合(例: Functions.callUDF 静的メソッドを使用して)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration クラスで次のいずれかのメソッドを使用します。
現在のセッションのみで UDF を使用する予定の場合は、
registerTemporary後続のセッションで UDF を使用する予定の場合は、
registerPermanent
UDFRegistration クラスのオブジェクトにアクセスするには、 Session オブジェクトの udf メソッドを呼び出します。
registerTemporary または registerPermanent メソッドを呼び出すときは、ラムダ式と、入力および出力のデータ型を表す DataTypes フィールド(またはそのクラスのメソッドによって構築されたオブジェクト)を渡します。
例:
registerPermanent は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。
注釈
registerPermanent は、外部ステージをサポートしていません。
例:
シリアル化できないオブジェクトの使用¶
ラムダ式の UDF を作成すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。
ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合、Snowparkライブラリは java.io.NotSerializableException 例外をスローします。
これが発生した場合は、オブジェクトをシリアル化できるようにする必要があります。
UDF の初期化コードの記述¶
UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。
次の例では、別のクラスを使用して、2つの UDFs に必要なコンテキストを初期化します。
最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。
2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。
UDF からのファイルの読み取り¶
前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。
さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスを改善できます。
ファイルを読み取るように UDF を設定するには、
該当ファイルを JAR ファイルに追加します。
たとえば、 UDF が
data/サブディレクトリ(data/hello.txt)内のファイルを使用する必要がある場合は、jarコマンドを実行してこのファイルを JAR ファイルに追加します。JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。
例:
UDF で、
Class.forName().getResourceAsStream()を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。thisへの依存関係の追加を回避するために、Class.forName("com.snowflake.snowpark_java.DataFrame")を(getClass()の代わりに)使用して、Classオブジェクトを取得できます。たとえば、
data/hello.txtファイルを読み取るには、この例では、リソース名は
/で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)
注釈
UDF 呼び出し間でファイルの内容が変更されることを予期しない場合は、クラスの静的フィールドにファイルを読み取り、フィールドが設定されていない場合にのみファイルを読み取ります。
次の例では、 UDF (readFileFunc)として使用される関数を使用してオブジェクト(UDFCode)を定義します。この関数は、文字列 hello, を含むことが期待されるファイル data/hello.txt を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。
例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME 列で UDF を呼び出します。この例では、 data/hello.txt ファイルが JAR ファイル myJar.jar にパッケージ化されていることを前提としています。
ユーザー定義のテーブル関数(UDTFs)の作成¶
Snowparkで UDTF を作成して登録するには、次が必要です。
次のセクションでは、これらのステップについて詳しく説明します。
UDTF の呼び出しについては、 UDTF の呼び出し をご参照ください。
UDTF クラスの定義¶
com.snowflake.snowpark_java.udtfパッケージ の JavaUDTFn インターフェイス(例: JavaUDTF0、 JavaUDTF1 など)の1つを実装するクラスを定義します。ここで、 n は、 UDTF の入力引数の数を指定します。たとえば、 UDTF が2つの入力引数を渡す場合は、 JavaUDTF2 インターフェイスを実装します。
クラスで、次のメソッドを実装します。
outputSchema()。これは、返される行(出力の「スキーマ」)のフィールドの名前とタイプを説明する
types.StructTypeオブジェクトを返します。process()。これは、 入力パーティション の各行に対して1回呼び出されます(以下の注を参照)。
inputSchema()。入力パラメーターの型を記述する
types.StructTypeオブジェクトを返します。process()メソッドがMap引数を渡す場合は、inputSchema()メソッドを実装する必要があります。それ以外の場合、このメソッドの実装はオプションです。endPartition()。これは、すべての行が
process()に渡された後、パーティションごとに1回呼び出されます。
UDTF が呼び出されると、行は UDTF に渡される前にパーティションにグループ化されます。
UDTF を呼び出すステートメントが PARTITION 句(明示的なパーティション)を指定している場合、その句は行のパーティション化の方法を決定します。
ステートメントで PARTITION 句が指定されていない場合(暗黙的なパーティション)、Snowflakeは行をパーティション化する最適な方法を決定します。
パーティションの説明については、 テーブル関数とパーティション をご参照ください。
UDTF クラスの例については、 UDTF クラスの例 をご参照ください。
outputSchema() メソッドの実装¶
outputSchema() メソッドを実装して、 process() メソッドと endPartition() メソッドによって返される行のフィールド(「出力スキーマ」)の名前とデータ型を定義します。
このメソッドでは、返される行の各フィールドのSnowflakeデータ型を表す StructField オブジェクトを含んでいる、 StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の出力スキーマに対して次の型オブジェクトをサポートしています。
SQL データ型 |
Java型 |
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
たとえば、 UDTF が単一の整数フィールドを持つ行を返す場合、
process() メソッドの実装¶
UDTF クラスで、 process() メソッドを実装します。
ここで、 n は、 UDTF に渡される引数の数です。
署名内の引数の数は、実装したインターフェイスに対応しています。たとえば、 UDTF が2つの入力引数を渡し、 JavaUDTF2 インターフェイスを実装している場合、 process() メソッドには次の署名があります。
このメソッドは、入力パーティションの行ごとに1回呼び出されます。
引数の型の選択¶
process() メソッドの各引数の型には、 UDTF に渡される引数のSnowflakeデータ型に対応するJava型を使用します。
Snowflakeは、 UDTF の引数に対して次のデータ型をサポートしています。
SQL データ型 |
Javaデータ型 |
メモ |
|---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
注釈
java.util.Map 引数を渡す場合は、 inputSchema メソッドを実装して、それらの引数の型を記述する必要があります。 inputSchema() メソッドの実装 をご参照ください。
行を返す¶
process() メソッドで、指定された入力値に対して、 UDTF により返されるデータを含んだ Row オブジェクトの java.util.stream.Stream を構築して返します。行のフィールドは、 outputSchema メソッドで指定した型を使用する必要があります。(outputSchema() メソッドの実装 を参照。)
たとえば、 UDTF が行を生成する場合は、生成された行に対する Row オブジェクトの Iterable を作成して返します。
inputSchema() メソッドの実装¶
process() メソッドが java.util.Map 引数を渡す場合は、入力引数の型を記述するために inputSchema() メソッドを実装する必要があります。
注釈
process() メソッドが Map 引数を渡さない場合は、 inputSchema() メソッドを実装する必要はありません。
このメソッドでは、 process() メソッドに渡された各引数のSnowflakeデータ型を表す StructField オブジェクトを含んだ StructType オブジェクトを作成して返します。Snowflakeは、 UDTF の入力スキーマに対して次の型オブジェクトをサポートしています。
SQL データ型 |
Java型 |
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
たとえば、 process() メソッドが Map<文字列、文字列> 引数と Map<文字列、バリアント> 引数を渡すとします。
これらの入力引数の型を記述する StructType オブジェクトを返すには、 inputSchema() メソッドを実装する必要があります。
endPartition() メソッドの実装¶
endPartition メソッドを実装し、入力パーティションのすべての行が process メソッドに渡された後に実行する必要があるコードを追加します。 endPartition メソッドは、入力パーティションごとに1回呼び出されます。
パーティション内のすべての行が処理された後に作業を実行する必要がある場合は、このメソッドを使用できます。たとえば、次が可能です。
各
processメソッド呼び出しでキャプチャした状態情報に基づいて行を返します。特定の入力行に関連付けられていない行を返します。
processメソッドによって生成された出力行を要約した行を返します。
返す行のフィールドは、 outputSchema メソッドで指定した型と一致する必要があります。(outputSchema() メソッドの実装 を参照。)
各パーティションの最後に追加の行を返す必要がない場合は、空の Stream を返します。例:
注釈
Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。
UDTF クラスの例¶
以下は、行の範囲を生成する UDTF クラスの例です。
UDTF は2つの引数を渡すため、クラスは
JavaUDTF2を実装します。引数
startとcountは、行の開始番号と生成する行数を指定します。
UDTF の登録¶
次に、新しいクラスのインスタンスを作成し、 UDTFRegistration メソッドの1つを呼び出してクラスを登録します。 仮 または 永続的 UDTF を登録できます。
仮 UDTF の登録¶
仮 UDTF を登録するには、 UDTFRegistration.registerTemporary を呼び出します。
名前で UDTF を呼び出す必要がない場合は、クラスのインスタンスを渡すことで匿名の UDTF を登録できます。
UDTF を名前で呼び出す必要がある場合は、 UDTF の名前も渡します。
永続的 UDTF の登録¶
後続のセッションで UDTF を使用する必要がある場合は、 UDTFRegistration.registerPermanent を呼び出して永続的 UDTF を登録します。
永続的 UDTF を登録するときは、登録メソッドが UDTF とその依存関係の JAR ファイルをアップロードするステージを指定する必要があります。例:
UDTF の呼び出し¶
UDTF を登録した後、返された TableFunction オブジェクトを Session オブジェクトの tableFunction メソッドに渡すと、 UDTF を呼び出すことができます。
名前で UDTF を呼び出すには、その名前で TableFunction オブジェクトを作成し、それを tableFunction メソッドに渡します。
SELECT ステートメントを介して UDTF を直接呼び出すこともできます。