Scalaでの DataFrames 用ユーザー定義関数(UDFs)の作成¶
Snowpark API は、Scalaのラムダまたは関数からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。
このトピックの内容:
紹介¶
Snowpark APIs を呼び出して、Scalaのカスタムラムダと関数に対するユーザー定義関数(UDFs)を作成し、これらの UDFs を呼び出して DataFrame のデータを処理できます。
Snowpark API を使用して UDF を作成すると、Snowparkライブラリは UDF のコードをシリアル化して内部ステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。
カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。
次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。
匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。
// Create and register an anonymous UDF (doubleUdf). val doubleUdf = udf((x: Int) => x + x) // Call the anonymous UDF. val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。
// Create and register a permanent named UDF ("doubleUdf"). session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage") // Call the named UDF. val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
次のセクションでは、Snowparkでの UDFs の作成に関する重要な情報を提供します。
このトピックの残りの部分では、 UDFs を作成する方法について説明します。
注釈
CREATE FUNCTION
コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。
詳細については、 スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。
引数と戻り値でサポートされるデータ型¶
Scala関数またはラムダの UDF を作成するには、関数またはラムダの引数と戻り値に、以下にリストされているサポートされているデータ型を使用する必要があります。
SQL データ型 |
Scalaデータ型 |
メモ |
---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
次の型の可変マップがサポートされています。
|
|
アプリの特徴を使用してオブジェクトに UDFs を作成する際の警告¶
Scalaは、Scalaオブジェクトを実行可能プログラムに変えるために拡張できる アプリ の特徴を提供します。 App
の特徴は、オブジェクト定義の本文にあるすべてのコードを自動的に実行する main
メソッドを提供します。(オブジェクト定義のコードは事実上 main
メソッドになります。)
App
の特徴を拡張する効果の1つは、 main
メソッドが呼び出されるまでオブジェクトのフィールドが初期化されないことです。オブジェクトが App
を拡張し、以前に初期化したオブジェクトフィールドを使用する UDF を定義した場合、サーバーにアップロードされた UDF 定義には、オブジェクトフィールドの初期化された値は含まれません。
たとえば、オブジェクトで myConst
という名前のフィールドを定義して初期化し、そのフィールドを UDF で使用するとします。
object Main extends App {
...
// Initialize a field.
val myConst = "Prefix "
// Use the field in a UDF.
// Because the App trait delays the initialization of the object fields,
// myConst in the UDF definition resolves to null.
val myUdf = udf((s : String) => myConst + s )
...
}
Snowparkが UDF 定義をシリアル化してSnowflakeにアップロードすると、 myConst
は初期化されず、 null
に解決されます。その結果、 UDF を呼び出すと、 myConst
に対して null
が返されます。
これを回避するには、オブジェクトを変更して App
の特徴を拡張しないようにし、コードに別の main
メソッドを実装します。
object Main {
...
def main(args: Array[String]): Unit = {
... // Your code ...
}
...
}
UDF の依存関係の指定¶
Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency()
を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)
Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。
Tip
アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency
を呼び出すときは、ステージ内のファイルへのパスを渡します。
Scala REPL を使用している場合は、 REPL によって生成されたクラスのディレクトリ を依存関係として追加する 必要があります。たとえば、 run.sh
スクリプトを使用して REPL を開始した場合は、次のメソッドを呼び出します。これにより、スクリプトによって作成された repl_classes
ディレクトリが追加されます。
// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")
// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
次の依存関係は、指定する必要がありません。
Scalaランタイムライブラリ。
これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。
Snowpark JAR ファイル。
Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。
ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、
Snowpark JAR ファイルをステージにアップロードします。
たとえば、次のコマンドはSnowpark JAR ファイルをステージ
@mystage
にアップロードします。PUT コマンドは、 JAR ファイルを圧縮し、結果のファイルにsnowpark-1.10.0.jar.gzという名前を付けます。-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.10.0.jar @mystage
addDependency
を呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.10.0.jar.gz")JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された
.gz
ファイル名拡張子が含まれていることに注意してください。現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。
Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。
Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、
addDependency
を呼び出す必要があります。
依存関係がステージにアップロードされるのに時間がかかりすぎる場合、Snowparkライブラリはタイムアウト例外を報告します。Snowparkライブラリが待機する時間を最大に設定するには、セッションの作成時に Snowparkがリクエストするタイムアウト(秒単位) プロパティを設定します。
匿名 UDF の作成¶
匿名 UDF を作成するには、次のいずれかを実行できます。
com.snowflake.snowpark.functions
オブジェクトでudf
関数を呼び出し、匿名関数の定義を渡します。UDFRegistration
クラスでregisterTemporary
メソッドを呼び出し、匿名関数の定義を渡します。匿名 UDF を登録しているため、name
パラメーターを持たないメソッド署名を使用する必要があります。
注釈
マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf
関数を使用するのではなく、 registerTemporary
メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session
オブジェクトが見つからないエラーを防ぐことができます。
これらのメソッドは UserDefinedFunction
オブジェクトを返し、これを使用して UDF を呼び出すことができます。(スカラーユーザー定義関数(UDFs)の呼び出し を参照。)
次の例では、匿名の UDF を作成します。
// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
注釈
Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark Scala用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。
次の例では、 String
値の Array
を渡し、各値に文字列 x
を追加する匿名の UDF を作成します。
// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector
)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data
列の言語を検出し、使用されている言語で追加の lang
列を含む新しい DataFrame を作成します。
// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")
// Create a detector
val detector = new LanguageDetector()
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
Option(detector.detect(s)).getOrElse("UNKNOWN"))
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(col("text_data")))
名前付き UDF の作成と登録¶
UDF を名前で呼び出す場合(例: functions
オブジェクトの callUDF
関数を使用する場合)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration
クラスで次のいずれかのメソッドを使用します。
現在のセッションのみで UDF を使用する予定の場合は、
registerTemporary
後続のセッションで UDF を使用する予定の場合は、
registerPermanent
UDFRegistration
クラスのオブジェクトにアクセスするには、 Session
クラスの udf
メソッドを呼び出します。
registerTemporary
は、現在のセッションで使用できる仮の UDF を作成します。
// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
registerPermanent
は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent
を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。
注釈
registerPermanent
は、外部ステージをサポートしていません。
例:
// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
注釈
Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark Scala用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。
Jupyter Notebookでの UDFs の作成¶
Jupyterノートブック で UDFs を作成する場合は、次の追加ステップに従う必要があります。
Snowpark Scala用Jupyterノートブックの設定 (Snowparkで動作するようにノートブックをまだセットアップしていない場合)
UDF の実装の記述¶
Serializable
を拡張するクラスで、関数の実装を定義します。例:
// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
val myUserDefinedFunc = (s: String) => {
...
}
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
別のセルで定義された変数へのアクセス¶
UDF の別のセルで定義された変数を使用する必要がある場合は、その変数を引数としてクラスコンストラクターに渡す必要があります。たとえば、セル1で変数を定義したとします。
In [1]:
val prefix = "Hello"
そして、セル2で定義した UDF でその変数を使用するとします。UDF のクラスコンストラクターで、この変数の引数を追加します。次に、クラスコンストラクターを呼び出して UDF を作成するときに、セル1で定義された変数を渡します。
In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
val prependPrefixFunc = (s: String) => {
s"$prefix $s"
}
}
// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
シリアル化できないオブジェクトの使用¶
ラムダまたは関数の UDF を作成すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。
ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合、Snowparkライブラリは java.io.NotSerializableException
例外をスローします。
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
これが発生した場合は、次のいずれかを実行できます。
オブジェクトをシリアル化可能にするか、
オブジェクトを
lazy val
として宣言するか、@transient
注釈を使用してオブジェクトのシリアル化を回避します。例:
// Declare the detector object as lazy. lazy val detector = new LanguageDetector("en") // The detector object is not serialized but is instead reconstructed on the server. val langUdf = udf((s: String) => Option(detector.detect(s)).getOrElse("UNKNOWN"))
UDF の初期化コードの記述¶
UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。
次の例では、別のクラスを使用して、3つの UDFs に必要なコンテキストを初期化します。
最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。
2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。
3番目の UDF は
lazy val
をキャプチャするため、コンテキストは最初の UDF 呼び出しで遅延インスタンス化され、後続の呼び出しで再利用されます。このアプローチは、コンテキストがシリアル化できない場合でも機能します。ただし、データフレーム内の ALL すべての UDF 呼び出しが、遅延生成された同じコンテキストを使用するという保証はありません。
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random
// Context needed for a UDF.
class Context {
val randomInt = Random.nextInt
}
// Serializable context needed for the UDF.
class SerContext extends Serializable {
val randomInt = Random.nextInt
}
object TestUdf {
def main(args: Array[String]): Unit = {
// Create the session.
val session = Session.builder.configFile("/<path>/profile.properties").create
import session.implicits._
session.range(1, 10, 2).show()
// Create a DataFrame with two columns ("c" and "d").
val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
dummy.show()
// Initialize the context once per invocation.
val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
dummy.select(udfRepeatedInit('c)).show()
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
val sC = new SerContext
val udfOnceInit = udf((i: Int) => sC.randomInt)
dummy.select(udfOnceInit('c)).show()
// Initialize the non-serializable context only once,
// regardless of the number of times that the UDF is invoked.
lazy val unserC = new Context
val udfOnceInitU = udf((i: Int) => unserC.randomInt)
dummy.select(udfOnceInitU('c)).show()
}
}
UDF からのファイルの読み取り¶
前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。
さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスを改善できます。
ファイルを読み取るように UDF を設定するには、
該当ファイルを JAR ファイルに追加します。
たとえば、 UDF が
data/
サブディレクトリ(data/hello.txt
)内のファイルを使用する必要がある場合は、jar
コマンドを実行してこのファイルを JAR ファイルに追加します。# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。
例:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar")
UDF で、
Class.getResourceAsStream
を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。this
への依存関係の追加を回避するために、classOf[com.snowflake.snowpark.DataFrame]
を(getClass
の代わりに)使用して、Class
オブジェクトを取得できます。たとえば、
data/hello.txt
ファイルを読み取るには、// Read data/hello.txt from myJar.jar. val resourceName = "/data/hello.txt" val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
この例では、リソース名は
/
で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)
注釈
UDF 呼び出し間でファイル内容の変更が想定されない場合は、ファイルを
lazy val
に読み込みます。これにより、ファイルロードコードは UDF への最初の呼び出しでのみ実行され、後続の呼び出しでは実行されません。
次の例では、 UDF (readFileFunc
)として使用される関数を使用してオブジェクト(UDFCode
)を定義します。この関数は、文字列 hello,
を含むことが期待されるファイル data/hello.txt
を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。
// Create a function object that reads a file.
object UDFCode extends Serializable {
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
// first accessed.
lazy val prefix = {
import java.io._
val resourceName = "/data/hello.txt"
val inputStream = classOf[com.snowflake.snowpark.DataFrame]
.getResourceAsStream(resourceName)
if (inputStream == null) {
throw new Exception("Can't find file " + resourceName)
}
scala.io.Source.fromInputStream(inputStream).mkString
}
val readFileFunc = (s: String) => prefix + " : " + s
}
例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME
列で UDF を呼び出します。この例では、 data/hello.txt
ファイルが JAR ファイル myJar.jar
にパッケージ化されていることを前提としています。
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")
// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
ユーザー定義のテーブル関数(UDTFs)の作成¶
Snowparkで UDTF を作成して登録するには、次が必要です。
次のセクションでは、これらのステップについて詳しく説明します。
UDTF の呼び出しについては、 UDTF の呼び出し をご参照ください。
UDTF クラスの定義¶
com.snowflake.snowpark.udtfパッケージ の UDTFn
クラス(例: UDTF0
、 UDTF1
など)の1つから継承するクラスを定義します。ここで、 n
は、 UDTF の入力引数の数を指定します。たとえば、 UDTF が2つの入力引数を渡す場合は、 UDTF2
クラスを拡張します。
クラスで、次のメソッドを上書きします。
outputSchema()。これは、返される行(出力の「スキーマ」)のフィールドの名前とタイプを説明する
types.StructType
オブジェクトを返します。process()。これは、 入力パーティション の各行に対して1回呼び出されます(以下の注を参照)。
endPartition()。これは、すべての行が
process()
に渡された後、パーティションごとに1回呼び出されます。
UDTF が呼び出されると、行は UDTF に渡される前にパーティションにグループ化されます。
UDTF を呼び出すステートメントが PARTITION 句(明示的なパーティション)を指定している場合、その句は行のパーティション化の方法を決定します。
ステートメントで PARTITION 句が指定されていない場合(暗黙的なパーティション)、Snowflakeは行をパーティション化する最適な方法を決定します。
パーティションの説明については、 テーブル関数とパーティション をご参照ください。
UDTF クラスの例については、 UDTF クラスの例 をご参照ください。
outputSchema() メソッドの上書き¶
outputSchema()
メソッドを上書きして、 process()
メソッドと endPartition()
メソッドによって返される行のフィールド(「出力スキーマ」)の名前とデータ型を定義します。
def outputSchema(): StructType
このメソッドでは、 StructField オブジェクトの Array
を使用して、 StructType オブジェクトを作成して返し、返された行の各フィールドのSnowflakeデータ型を指定します。Snowflakeは、 UDTF の出力スキーマに対して次の型オブジェクトをサポートしています。
SQL データ型 |
Scala型 |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
たとえば、 UDTF が単一の整数フィールドを持つ行を返す場合、
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
process() メソッドの上書き¶
UDTF クラスで、 process()
メソッドを上書きします。
def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
ここで、 n
は、 UDTF に渡される引数の数です。
署名の引数の数は、拡張したクラスに対応しています。たとえば、 UDTF が2つの入力引数を渡し、 UDTF2
クラスを拡張している場合、 process()
メソッドには次の署名があります。
def process(arg0: A0, arg1: A1): Iterable[Row]
このメソッドは、入力パーティションの行ごとに1回呼び出されます。
引数の型の選択¶
process()
メソッドの各引数の型には、 UDTF に渡される引数のSnowflakeデータ型に対応するScala型を使用します。
Snowflakeは、 UDTF の引数に対して次のデータ型をサポートしています。
SQL データ型 |
Scalaデータ型 |
メモ |
---|---|---|
次の型がサポートされています。
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
次の型の可変マップがサポートされています。
|
行を返す¶
process()
メソッドで、指定された入力値に対して UDTF によって返されるデータを含む Row
オブジェクトの Iterable
を作成して返します。行のフィールドは、 outputSchema
メソッドで指定した型を使用する必要があります。(outputSchema() メソッドの上書き を参照。)
たとえば、 UDTF が行を生成する場合は、生成された行に対する Row
オブジェクトの Iterable
を作成して返します。
override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_))
endPartition() メソッドの上書き¶
endPartition
メソッドを上書きし、入力パーティションのすべての行が process
メソッドに渡された後に実行する必要があるコードを追加します。 endPartition
メソッドは、入力パーティションごとに1回呼び出されます。
def endPartition(): Iterable[Row]
パーティション内のすべての行が処理された後に作業を実行する必要がある場合は、このメソッドを使用できます。たとえば、次が可能です。
各
process
メソッド呼び出しでキャプチャした状態情報に基づいて行を返します。特定の入力行に関連付けられていない行を返します。
process
メソッドによって生成された出力行を要約した行を返します。
返す行のフィールドは、 outputSchema
メソッドで指定した型と一致する必要があります。(outputSchema() メソッドの上書き を参照。)
各パーティションの最後に追加の行を返す必要がない場合は、 Row
オブジェクトの空の Iterable
を返します。たとえば、
override def endPartition(): Iterable[Row] = Array.empty[Row]
注釈
Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition
の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。
UDTF クラスの例¶
以下は、行の範囲を生成する UDTF クラスの例です。
UDTF は2つの引数を渡すため、クラスは
UDTF2
を拡張します。引数
start
とcount
は、行の開始番号と生成する行数を指定します。
class MyRangeUdtf extends UDTF2[Int, Int] {
override def process(start: Int, count: Int): Iterable[Row] =
(start until (start + count)).map(Row(_))
override def endPartition(): Iterable[Row] = Array.empty[Row]
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
UDTF の登録¶
次に、新しいクラスのインスタンスを作成し、 UDTFRegistration メソッドの1つを呼び出してクラスを登録します。 仮 または 永続的 UDTF を登録できます。
仮 UDTF の登録¶
仮 UDTF を登録するには、 UDTFRegistration.registerTemporary
を呼び出します。
名前で UDTF を呼び出す必要がない場合は、クラスのインスタンスを渡すことで匿名の UDTF を登録できます。
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show
UDTF を名前で呼び出す必要がある場合は、 UDTF の名前も渡します。
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
永続的 UDTF の登録¶
後続のセッションで UDTF を使用する必要がある場合は、 UDTFRegistration.registerPermanent
を呼び出して永続的 UDTF を登録します。
永続的 UDTF を登録するときは、登録メソッドが UDTF とその依存関係の JAR ファイルをアップロードするステージを指定する必要があります。例:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage") // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
UDTF の呼び出し¶
UDTF を登録した後、返された TableFunction
オブジェクトを Session
オブジェクトの tableFunction
メソッドに渡すと、 UDTF を呼び出すことができます。
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show()
名前で UDTF を呼び出すには、その名前で TableFunction
オブジェクトを作成し、それを tableFunction
メソッドに渡します。
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
SELECT ステートメントを介して UDTF を直接呼び出すこともできます。
session.sql("select * from table(myUdtf(10, 5))")