Scalaでの DataFrames 用ユーザー定義関数(UDFs)の作成

Snowpark API は、Scalaのラムダまたは関数からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。

このトピックの内容:

紹介

Snowpark APIs を呼び出して、Scalaのカスタムラムダと関数に対するユーザー定義関数(UDFs)を作成し、これらの UDFs を呼び出して DataFrame のデータを処理できます。

Snowpark API を使用して UDF を作成すると、Snowparkライブラリは UDF のコードをシリアル化して内部ステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。

カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。

次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。

  • 匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • 名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

次のセクションでは、Snowparkでの UDFs の作成に関する重要な情報を提供します。

このトピックの残りの部分では、 UDFs を作成する方法について説明します。

注釈

CREATE FUNCTION コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。

詳細については、 スカラーユーザー定義関数(UDFs)の呼び出し をご参照ください。

引数と戻り値でサポートされるデータ型

Scala関数またはラムダの UDF を作成するには、関数またはラムダの引数と戻り値に、以下にリストされているサポートされているデータ型を使用する必要があります。

SQL データ型

Scalaデータ型

メモ

NUMBER

次の型がサポートされています。

  • Short または Option[Short]

  • Int または Option[Int]

  • Long または Option[Long]

  • java.math.BigDecimal

FLOAT

Float または Option[Float]

DOUBLE

Double または Option[Double]

VARCHAR

String または java.lang.String

BOOLEAN

Boolean または Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] または Array[Variant]

OBJECT

Map[String, String] または Map[String, Variant]

次の型の可変マップがサポートされています。

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

com.snowflake.snowpark.types.Geography

アプリの特徴を使用してオブジェクトに UDFs を作成する際の警告

Scalaは、Scalaオブジェクトを実行可能プログラムに変えるために拡張できる アプリ の特徴を提供します。 App の特徴は、オブジェクト定義の本文にあるすべてのコードを自動的に実行する main メソッドを提供します。(オブジェクト定義のコードは事実上 main メソッドになります。)

App の特徴を拡張する効果の1つは、 main メソッドが呼び出されるまでオブジェクトのフィールドが初期化されないことです。オブジェクトが App を拡張し、以前に初期化したオブジェクトフィールドを使用する UDF を定義した場合、サーバーにアップロードされた UDF 定義には、オブジェクトフィールドの初期化された値は含まれません。

たとえば、オブジェクトで myConst という名前のフィールドを定義して初期化し、そのフィールドを UDF で使用するとします。

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

Snowparkが UDF 定義をシリアル化してSnowflakeにアップロードすると、 myConst は初期化されず、 null に解決されます。その結果、 UDF を呼び出すと、 myConst に対して null が返されます。

これを回避するには、オブジェクトを変更して App の特徴を拡張しないようにし、コードに別の main メソッドを実装します。

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

UDF の依存関係の指定

Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency() を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)

Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。

ちなみに

アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency を呼び出すときは、ステージ内のファイルへのパスを渡します。

Scala REPL を使用している場合は、 REPL によって生成されたクラスのディレクトリ を依存関係として追加する 必要があります。たとえば、 run.sh スクリプトを使用して REPL を開始した場合は、次のメソッドを呼び出します。これにより、スクリプトによって作成された repl_classes ディレクトリが追加されます。

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

次の依存関係は、指定する必要がありません。

  • Scalaランタイムライブラリ。

    これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。

  • Snowpark JAR ファイル。

    Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。

    ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、

    1. Snowpark JAR ファイルをステージにアップロードします。

      たとえば、次のコマンドはSnowpark JAR ファイルをステージ @mystage にアップロードします。PUT コマンドは、 JAR ファイルを圧縮し、結果のファイルにsnowpark-1.9.0.jar.gzという名前を付けます。

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. addDependency を呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。

      たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz")

      JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された .gz ファイル名拡張子が含まれていることに注意してください。

  • 現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。

    Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。

    Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、 addDependency を呼び出す必要があります。

依存関係がステージにアップロードされるのに時間がかかりすぎる場合、Snowparkライブラリはタイムアウト例外を報告します。Snowparkライブラリが待機する時間を最大に設定するには、セッションの作成時に Snowparkがリクエストするタイムアウト(秒単位) プロパティを設定します。

匿名 UDF の作成

匿名 UDF を作成するには、次のいずれかを実行できます。

  • com.snowflake.snowpark.functions オブジェクトで udf 関数を呼び出し、匿名関数の定義を渡します。

  • UDFRegistration クラスで registerTemporary メソッドを呼び出し、匿名関数の定義を渡します。匿名 UDF を登録しているため、 name パラメーターを持たないメソッド署名を使用する必要があります。

注釈

マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf 関数を使用するのではなく、 registerTemporary メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。

これらのメソッドは UserDefinedFunction オブジェクトを返し、これを使用して UDF を呼び出すことができます。(スカラーユーザー定義関数(UDFs)の呼び出し を参照。)

次の例では、匿名の UDF を作成します。

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

注釈

Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark Scala用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。

次の例では、 String 値の Array を渡し、各値に文字列 x を追加する匿名の UDF を作成します。

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data 列の言語を検出し、使用されている言語で追加の lang 列を含む新しい DataFrame を作成します。

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

名前付き UDF の作成と登録

UDF を名前で呼び出す場合(例: functions オブジェクトの callUDF 関数を使用する場合)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration クラスで次のいずれかのメソッドを使用します。

  • 現在のセッションのみで UDF を使用する予定の場合は、 registerTemporary

  • 後続のセッションで UDF を使用する予定の場合は、 registerPermanent

UDFRegistration クラスのオブジェクトにアクセスするには、 Session クラスの udf メソッドを呼び出します。

registerTemporary は、現在のセッションで使用できる仮の UDF を作成します。

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。

注釈

registerPermanent は、外部ステージをサポートしていません。

例:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

注釈

Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark Scala用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。

Jupyter Notebookでの UDFs の作成

Jupyterノートブック で UDFs を作成する場合は、次の追加ステップに従う必要があります。

UDF の実装の記述

Serializable を拡張するクラスで、関数の実装を定義します。例:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

別のセルで定義された変数へのアクセス

UDF の別のセルで定義された変数を使用する必要がある場合は、その変数を引数としてクラスコンストラクターに渡す必要があります。たとえば、セル1で変数を定義したとします。

In [1]:
Copy
val prefix = "Hello"
Copy

そして、セル2で定義した UDF でその変数を使用するとします。UDF のクラスコンストラクターで、この変数の引数を追加します。次に、クラスコンストラクターを呼び出して UDF を作成するときに、セル1で定義された変数を渡します。

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

シリアル化できないオブジェクトの使用

ラムダまたは関数の UDF を作成すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。

ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合、Snowparkライブラリは java.io.NotSerializableException 例外をスローします。

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

これが発生した場合は、次のいずれかを実行できます。

  • オブジェクトをシリアル化可能にするか、

  • オブジェクトを lazy val として宣言するか、 @transient 注釈を使用してオブジェクトのシリアル化を回避します。

    例:

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

UDF の初期化コードの記述

UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。

次の例では、別のクラスを使用して、3つの UDFs に必要なコンテキストを初期化します。

  • 最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。

  • 2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。

  • 3番目の UDF は lazy val をキャプチャするため、コンテキストは最初の UDF 呼び出しで遅延インスタンス化され、後続の呼び出しで再利用されます。このアプローチは、コンテキストがシリアル化できない場合でも機能します。ただし、データフレーム内の ALL すべての UDF 呼び出しが、遅延生成された同じコンテキストを使用するという保証はありません。

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

UDF からのファイルの読み取り

前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスを改善できます。

ファイルを読み取るように UDF を設定するには、

  1. 該当ファイルを JAR ファイルに追加します。

    たとえば、 UDF が data/ サブディレクトリ(data/hello.txt)内のファイルを使用する必要がある場合は、 jar コマンドを実行してこのファイルを JAR ファイルに追加します。

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。

    例:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. UDF で、 Class.getResourceAsStream を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。

    this への依存関係の追加を回避するために、 classOf[com.snowflake.snowpark.DataFrame] を(getClass の代わりに)使用して、 Class オブジェクトを取得できます。

    たとえば、 data/hello.txt ファイルを読み取るには、

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    この例では、リソース名は / で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)

注釈

UDF 呼び出し間でファイル内容の変更が想定されない場合は、ファイルを lazy val に読み込みます。これにより、ファイルロードコードは UDF への最初の呼び出しでのみ実行され、後続の呼び出しでは実行されません。

次の例では、 UDF (readFileFunc)として使用される関数を使用してオブジェクト(UDFCode)を定義します。この関数は、文字列 hello, を含むことが期待されるファイル data/hello.txt を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME 列で UDF を呼び出します。この例では、 data/hello.txt ファイルが JAR ファイル myJar.jar にパッケージ化されていることを前提としています。

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

ユーザー定義のテーブル関数(UDTFs)の作成

Snowparkで UDTF を作成して登録するには、次が必要です。

次のセクションでは、これらのステップについて詳しく説明します。

UDTF の呼び出しについては、 UDTF の呼び出し をご参照ください。

UDTF クラスの定義

com.snowflake.snowpark.udtfパッケージUDTFn クラス(例: UDTF0UDTF1 など)の1つから継承するクラスを定義します。ここで、 n は、 UDTF の入力引数の数を指定します。たとえば、 UDTF が2つの入力引数を渡す場合は、 UDTF2 クラスを拡張します。

クラスで、次のメソッドを上書きします。

  • outputSchema()。これは、返される行(出力の「スキーマ」)のフィールドの名前とタイプを説明する types.StructType オブジェクトを返します。

  • process()。これは、 入力パーティション の各行に対して1回呼び出されます(以下の注を参照)。

  • endPartition()。これは、すべての行が process() に渡された後、パーティションごとに1回呼び出されます。

UDTF が呼び出されると、行は UDTF に渡される前にパーティションにグループ化されます。

  • UDTF を呼び出すステートメントが PARTITION 句(明示的なパーティション)を指定している場合、その句は行のパーティション化の方法を決定します。

  • ステートメントで PARTITION 句が指定されていない場合(暗黙的なパーティション)、Snowflakeは行をパーティション化する最適な方法を決定します。

パーティションの説明については、 テーブル関数とパーティション をご参照ください。

UDTF クラスの例については、 UDTF クラスの例 をご参照ください。

outputSchema() メソッドの上書き

outputSchema() メソッドを上書きして、 process() メソッドと endPartition() メソッドによって返される行のフィールド(「出力スキーマ」)の名前とデータ型を定義します。

def outputSchema(): StructType
Copy

このメソッドでは、 StructField オブジェクトの Array を使用して、 StructType オブジェクトを作成して返し、返された行の各フィールドのSnowflakeデータ型を指定します。Snowflakeは、 UDTF の出力スキーマに対して次の型オブジェクトをサポートしています。

SQL データ型

Scala型

com.snowflake.snowpark.types

NUMBER

Short または Option[Short]

ShortType

NUMBER

Int または Option[Int]

IntType

NUMBER

Long または Option[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

Float または Option[Float]

FloatType

DOUBLE

Double または Option[Double]

DoubleType

VARCHAR

String または java.lang.String

StringType

BOOLEAN

Boolean または Option[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

com.snowflake.snowpark.types.Variant

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

たとえば、 UDTF が単一の整数フィールドを持つ行を返す場合、

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

process() メソッドの上書き

UDTF クラスで、 process() メソッドを上書きします。

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

ここで、 n は、 UDTF に渡される引数の数です。

署名の引数の数は、拡張したクラスに対応しています。たとえば、 UDTF が2つの入力引数を渡し、 UDTF2 クラスを拡張している場合、 process() メソッドには次の署名があります。

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

このメソッドは、入力パーティションの行ごとに1回呼び出されます。

引数の型の選択

process() メソッドの各引数の型には、 UDTF に渡される引数のSnowflakeデータ型に対応するScala型を使用します。

Snowflakeは、 UDTF の引数に対して次のデータ型をサポートしています。

SQL データ型

Scalaデータ型

メモ

NUMBER

次の型がサポートされています。

  • Short または Option[Short]

  • Int または Option[Int]

  • Long または Option[Long]

  • java.math.BigDecimal

FLOAT

Float または Option[Float]

DOUBLE

Double または Option[Double]

VARCHAR

String または java.lang.String

BOOLEAN

Boolean または Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] または Array[Variant]

OBJECT

Map[String, String] または Map[String, Variant]

次の型の可変マップがサポートされています。

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

行を返す

process() メソッドで、指定された入力値に対して UDTF によって返されるデータを含む Row オブジェクトの Iterable を作成して返します。行のフィールドは、 outputSchema メソッドで指定した型を使用する必要があります。(outputSchema() メソッドの上書き を参照。)

たとえば、 UDTF が行を生成する場合は、生成された行に対する Row オブジェクトの Iterable を作成して返します。

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

endPartition() メソッドの上書き

endPartition メソッドを上書きし、入力パーティションのすべての行が process メソッドに渡された後に実行する必要があるコードを追加します。 endPartition メソッドは、入力パーティションごとに1回呼び出されます。

def endPartition(): Iterable[Row]
Copy

パーティション内のすべての行が処理された後に作業を実行する必要がある場合は、このメソッドを使用できます。たとえば、次が可能です。

  • process メソッド呼び出しでキャプチャした状態情報に基づいて行を返します。

  • 特定の入力行に関連付けられていない行を返します。

  • process メソッドによって生成された出力行を要約した行を返します。

返す行のフィールドは、 outputSchema メソッドで指定した型と一致する必要があります。(outputSchema() メソッドの上書き を参照。)

各パーティションの最後に追加の行を返す必要がない場合は、 Row オブジェクトの空の Iterable を返します。たとえば、

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

注釈

Snowflakeは、正常に処理するためにタイムアウトが調整された大型のパーティションをサポートしていますが、特に大型のパーティションでは、処理中にタイムアウトする可能性があります(endPartition の完了に時間がかかりすぎる場合など)。特定の使用シナリオに合わせてタイムアウトのしきい値を調整する必要がある場合は、 Snowflakeサポート にお問い合わせください。

UDTF クラスの例

以下は、行の範囲を生成する UDTF クラスの例です。

  • UDTF は2つの引数を渡すため、クラスは UDTF2 を拡張します。

  • 引数 startcount は、行の開始番号と生成する行数を指定します。

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

UDTF の登録

次に、新しいクラスのインスタンスを作成し、 UDTFRegistration メソッドの1つを呼び出してクラスを登録します。 または 永続的 UDTF を登録できます。

仮 UDTF の登録

仮 UDTF を登録するには、 UDTFRegistration.registerTemporary を呼び出します。

  • 名前で UDTF を呼び出す必要がない場合は、クラスのインスタンスを渡すことで匿名の UDTF を登録できます。

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • UDTF を名前で呼び出す必要がある場合は、 UDTF の名前も渡します。

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

永続的 UDTF の登録

後続のセッションで UDTF を使用する必要がある場合は、 UDTFRegistration.registerPermanent を呼び出して永続的 UDTF を登録します。

永続的 UDTF を登録するときは、登録メソッドが UDTF とその依存関係の JAR ファイルをアップロードするステージを指定する必要があります。例:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

UDTF の呼び出し

UDTF を登録した後、返された TableFunction オブジェクトを Session オブジェクトの tableFunction メソッドに渡すと、 UDTF を呼び出すことができます。

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

名前で UDTF を呼び出すには、その名前で TableFunction オブジェクトを作成し、それを tableFunction メソッドに渡します。

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

SELECT ステートメントを介して UDTF を直接呼び出すこともできます。

session.sql("select * from table(myUdtf(10, 5))")
Copy