DataFrames のユーザー定義関数(UDFs)の作成

Snowpark API は、Scalaのラムダまたは関数からユーザー定義関数を作成するために使用できるメソッドを提供します。このトピックでは、これらのタイプの関数を作成する方法について説明します。

このトピックの内容:

Snowparkを使用すると、カスタムラムダと関数のユーザー定義関数(UDFs)を作成でき、これらの UDFs を呼び出して DataFrame のデータを処理できます。

Snowpark API を使用して UDF を作成すると、Snowparkライブラリは関数のコードを内部ステージにアップロードします。UDF を呼び出すと、Snowparkライブラリはデータがあるサーバー上で関数を実行します。その結果、関数でデータを処理するためにデータをクライアントに転送する必要はありません。

カスタムコードでは、 JAR ファイルにパッケージ化されているコードを呼び出すこともできます(たとえば、サードパーティライブラリのJavaクラス)。

次に挙げる2つの方法のいずれかで、カスタムコードの UDF を作成できます。

  • 匿名の UDF を作成し、関数を変数に割り当てることができます。この変数がスコープ内にある限り、この変数を使用して UDF を呼び出すことができます。

  • 名前付き UDF を作成し、名前で UDF を呼び出すことができます。これは、たとえば、名前で UDF を呼び出す必要がある場合や、後続のセッションで UDF を使用する必要がある場合に使用できます。

次のセクションでは、これらの UDFs を作成する方法について説明します。

CREATE FUNCTION コマンドを実行して UDF を定義した場合は、Snowparkでその UDF を呼び出すことができます。詳細については、 ユーザー定義関数(UDFs)の呼び出し をご参照ください。

アプリの特徴を使用してオブジェクトに UDFs を作成する際の警告

Scalaは、Scalaオブジェクトを実行可能プログラムに変えるために拡張できる アプリ の特徴を提供します。 App の特徴は、オブジェクト定義の本文にあるすべてのコードを自動的に実行する main メソッドを提供します。(オブジェクト定義のコードは事実上 main メソッドになります。)

App の特徴を拡張する効果の1つは、 main メソッドが呼び出されるまでオブジェクトのフィールドが初期化されないことです。オブジェクトが App を拡張し、以前に初期化したオブジェクトフィールドを使用する UDF を定義した場合、サーバーにアップロードされた UDF 定義には、オブジェクトフィールドの初期化された値は含まれません。

たとえば、オブジェクトで myConst という名前のフィールドを定義して初期化し、そのフィールドを UDF で使用するとします。

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}

Snowparkが UDF 定義をシリアル化してSnowflakeにアップロードすると、 myConst は初期化されず、 null に解決されます。その結果、 UDF を呼び出すと、 myConst に対して null が返されます。

これを回避するには、オブジェクトを変更して App の特徴を拡張しないようにし、コードに別の main メソッドを実装します。

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}

UDF の依存関係の指定

Snowpark API を介して UDF を定義するには、 UDF が依存するクラスとリソースを含むファイル(例: JAR ファイル、リソースファイルなど)に対して Session.addDependency() を呼び出す必要があります。(UDF からの読み取りリソースの詳細については、 UDF からのファイルの読み取り を参照。)

Snowparkライブラリは、これらのファイルを内部ステージにアップロードし、 UDF の実行時にファイルをクラスパスに追加します。

ちなみに

アプリケーションを実行するたびにライブラリがファイルをアップロードすることを防止するには、ファイルをステージにアップロードします。 addDependency を呼び出すときは、ステージ内のファイルへのパスを渡します。

Scala REPL を使用している場合は、 REPL によって生成されたクラスのディレクトリ を依存関係として追加する 必要があります。たとえば、 run.sh スクリプトを使用して REPL を開始した場合は、次のメソッドを呼び出します。これにより、スクリプトによって作成された repl_classes ディレクトリが追加されます。

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")

次の例は、ステージに依存関係として JAR ファイルを追加する方法を示しています。

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")

次の例は、 JAR ファイルとリソースファイルの依存関係を追加する方法を示しています。

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")

次の依存関係は、指定する必要がありません。

  • Scalaランタイムライブラリ。

    これらのライブラリは、 UDFs が実行されるサーバーのランタイム環境ですでに使用可能です。

  • Snowpark JAR ファイル。

    Snowparkライブラリは、Snowpark JAR ファイルを自動的に検出してサーバーにアップロードしようとします。

    ライブラリがSnowpark JAR ファイルをサーバーに繰り返しアップロードしないようにするには、

    1. Snowpark JAR ファイルをステージにアップロードします。

      たとえば、次のコマンドはSnowpark JAR ファイルをステージ @mystage にアップロードします。PUT コマンドは、 JAR ファイルを圧縮し、結果のファイルに snowpark-0.5.0.jar.gz という名前を付けます。

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-0.5.0.jar @mystage
      
    2. addDependency を呼び出して、Snowpark JAR ファイルを依存関係としてステージに追加します。

      たとえば、前のコマンドでアップロードされたSnowpark JAR ファイルを追加するには次のようにします。

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-0.5.0.jar.gz")
      

      JAR ファイルへの指定されたパスには、 PUT コマンドによって追加された .gz ファイル名拡張子が含まれていることに注意してください。

  • 現在実行中のアプリケーションを含む JAR ファイルまたはディレクトリ。

    Snowparkライブラリは、これらの依存関係を自動的に検出してアップロードしようとします。

    Snowparkライブラリがこれらの依存関係を自動的に検出できない場合、ライブラリはエラーを報告します。これらの依存関係を手動で追加するには、 addDependency を呼び出す必要があります。

匿名 UDF の作成

匿名 UDF を作成するには、次のいずれかを実行できます。

  • com.snowflake.snowpark.functions オブジェクトで udf 関数を呼び出し、匿名関数の定義を渡します。

  • UDFRegistration クラスで registerTemporary メソッドを呼び出し、匿名関数の定義を渡します。匿名 UDF を登録しているため、 name パラメーターを持たないメソッド署名を使用する必要があります。

注釈

マルチスレッドコードを作成する場合(例: 並列コレクションを使用する場合)は、 udf 関数を使用するのではなく、 registerTemporary メソッドを使用して UDFs を登録します。これにより、デフォルトのSnowflake Session オブジェクトが見つからないエラーを防ぐことができます。

次の例では、カスタムクラス(テキストで使用されている言語を検出する LanguageDetector)を使用する匿名 UDF を作成します。この例では、匿名 UDF を呼び出して DataFrame の text_data 列の言語を検出し、使用されている言語で追加の lang 列を含む新しい DataFrame を作成します。

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))

注釈

Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。

クライアントコードを実行すると、Snowparkライブラリがラムダクロージャをシリアル化し、実行のためにサーバーに送信します。ラムダクロージャによってキャプチャされたオブジェクトがシリアル化できない場合は、次のいずれかを実行できます。

  • オブジェクトをシリアル化可能にするか、

  • オブジェクトを lazy val として宣言するか、 @transient 注釈を使用してオブジェクトのシリアル化を回避します。

例:

// Declare the detector object as lazy.
lazy val detector = new LanguageDetector("en")
// The detector object is not serialized but is instead reconstructed on the server.
val langUdf = udf(...)
val dfEmailsWithLangCol = ...

名前付き UDF の作成と登録

UDF を名前で呼び出す場合(例: functions オブジェクトの callUDF 関数を使用する場合)、または後続のセッションで UDF を使用する必要がある場合は、名前付き UDF を作成して登録できます。これを実行するには、 UDFRegistration クラスで次のいずれかのメソッドを使用します。

  • 現在のセッションのみで UDF を使用する予定の場合は、 registerTemporary

  • 後続のセッションで UDF を使用する予定の場合は、 registerPermanent

UDFRegistration クラスのオブジェクトにアクセスするには、 Session クラスの udf メソッドを呼び出します。

registerTemporary は、現在のセッションで使用できる仮の UDF を作成します。

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

registerPermanent は、現在および後続のセッションで使用できる UDF を作成します。 registerPermanent を呼び出すときは、 UDF の JAR ファイルとその依存関係がアップロードされる内部ステージの場所で場所を指定する必要もあります。

注釈

registerPermanent は、外部ステージをサポートしていません。

例:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

注釈

Jupyterのノートブックで UDF を作成する場合は、Snowparkで動作するようにノートブックを設定し(Snowpark用Jupyterノートブックの設定 を参照)、ノートブックで UDFs を書き込むためのガイドラインに従う必要があります(Jupyter Notebookでの UDFs の作成 を参照)。

Jupyter Notebookでの UDFs の作成

Jupyterノートブック で UDFs を作成する場合は、次の追加ステップに従う必要があります。

UDF の実装の記述

Serializable を拡張するクラスで、関数の実装を定義します。例:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)

別のセルで定義された変数へのアクセス

UDF の別のセルで定義された変数を使用する必要がある場合は、その変数を引数としてクラスコンストラクターに渡す必要があります。たとえば、セル1で変数を定義したとします。

In [1]:
val prefix = "Hello"

そして、セル2で定義した UDF でその変数を使用するとします。UDF のクラスコンストラクターで、この変数の引数を追加します。次に、クラスコンストラクターを呼び出して UDF を作成するときに、セル1で定義された変数を渡します。

In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()

UDF の初期化コードの記述

UDF に初期化コードまたはコンテキストが必要な場合は、 UDF クロージャの一部としてキャプチャされた値を介して提供できます。

次の例では、別のクラスを使用して、3つの UDFs に必要なコンテキストを初期化します。

  • 最初の UDF はラムダ内にクラスの新しいインスタンスを作成するため、 UDF が呼び出されるたびに初期化が実行されます。

  • 2番目の UDF は、クライアントプログラムで生成されたクラスのインスタンスをキャプチャします。クライアントで生成されたコンテキストはシリアル化され、 UDF によって使用されます。このアプローチが機能するには、コンテキストクラスがシリアル化できる必要があります。

  • 3番目の UDF は lazy val をキャプチャするため、コンテキストは最初の UDF 呼び出しで遅延インスタンス化され、後続の呼び出しで再利用されます。このアプローチは、コンテキストがシリアル化できない場合でも機能します。ただし、データフレーム内の ALL すべての UDF 呼び出しが、遅延生成された同じコンテキストを使用するという保証はありません。

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}

UDF からのファイルの読み取り

前述のように、Snowparkライブラリはサーバーに UDFs をアップロードして実行します。UDF がファイルからデータを読み取る必要がある場合は、ファイルが UDF とともにアップロードされていることを確認する必要があります。

さらに、ファイルの内容が UDF の呼び出し間で同じままである場合は、後続の呼び出しではなく、最初の呼び出し中に1回ファイルをロードするコードを記述できます。これにより、 UDF 呼び出しのパフォーマンスが向上します。

ファイルを読み取るように UDF を設定するには、

  1. 該当ファイルを JAR ファイルに追加します。

    たとえば、 UDF が data/ サブディレクトリ(data/hello.txt)内のファイルを使用する必要がある場合は、 jar コマンドを実行してこのファイルを JAR ファイルに追加します。

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
  2. JAR ファイルが依存関係であることを指定します。これにより、ファイルがサーバーにアップロードされ、クラスパスに追加されます。 UDF の依存関係の指定 をご参照ください。

    例:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
  3. UDF で、 Class.getResourceAsStream を呼び出してクラスパスでファイルを見つけ、ファイルを読み取ります。

    this への依存関係の追加を回避するため。 classOf[com.snowflake.snowpark.DataFrame] を(getClass の代わりに)使用して、 Class オブジェクトを取得できます。

    たとえば、 data/hello.txt ファイルを読み取るには、

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    

    この例では、リソース名は / で始まります。これは、 JAR ファイル内のファイルのフルパスであることを示しています。(この場合、ファイルの場所はクラスのパッケージを基準としていません。)

注釈

UDF 呼び出し間でファイル内容の変更が想定されない場合は、ファイルを lazy val に読み込みます。これにより、ファイルロードコードは UDF への最初の呼び出しでのみ実行され、後続の呼び出しでは実行されません。

次の例では、 UDF (readFileFunc)として使用される関数を使用してオブジェクト(UDFCode)を定義します。この関数は、文字列 hello, を含むことが期待されるファイル data/hello.txt を読み取ります。この関数は、引数として渡された文字列にこの文字列を付加します。

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}

例の次の部分では、関数を匿名の UDF として登録します。この例では、 DataFrame の NAME 列で UDF を呼び出します。この例では、 data/hello.txt ファイルが JAR ファイル myJar.jar にパッケージ化されていることを前提としています。

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()