Scala에서 DataFrame용 사용자 정의 함수(UDF) 만들기

Snowpark API는 람다 또는 Scala의 함수에서 사용자 정의 함수를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 이러한 형식의 함수를 만드는 방법에 대해 설명합니다.

이 항목의 내용:

소개

Snowpark API를 호출하여 Scala의 사용자 지정 람다 및 함수에 대한 사용자 정의 함수(UDF)를 만들 수 있으며 이러한 UDF를 호출하여 DataFrame의 데이터를 처리할 수 있습니다.

Snowpark API를 사용하여 UDF를 만들면 Snowpark 라이브러리는 직렬화하여 UDF에 대한 코드를 내부 스테이지에 업로드합니다. UDF를 호출하면 Snowpark 라이브러리는 데이터가 있는 서버에서 함수를 실행합니다. 결과적으로, 함수가 데이터를 처리하기 위해 데이터를 클라이언트로 전송할 필요가 없습니다.

사용자 지정 코드에서 사용자는 JAR 파일에 패키지된 코드(예: 서드 파티 라이브러리용 Java 클래스)를 호출할 수도 있습니다.

다음 두 가지 방법 중 하나로 사용자 지정 코드에 대한 UDF를 만들 수 있습니다.

  • 익명 UDF를 만들고 함수를 변수에 할당할 수 있습니다. 이 변수가 범위 내에 있는 한 이 변수를 사용하여 UDF를 호출할 수 있습니다.

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • 명명된 UDF를 만들고 이름으로 UDF를 호출할 수 있습니다. 예를 들어, 이름으로 UDF를 호출해야 하거나 후속 세션에서 UDF를 사용해야 하는 경우 이를 사용할 수 있습니다.

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

다음 섹션에서는 Snowpark에서 UDF를 만드는 방법에 대한 중요한 정보를 제공합니다.

이 항목의 나머지 부분에서는 UDF를 만드는 방법에 대해 설명합니다.

참고

CREATE FUNCTION 명령을 실행하여 UDF를 정의한 경우, Snowpark에서 해당 UDF를 호출할 수 있습니다.

자세한 내용은 스칼라 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.

인자 및 반환 값에 지원되는 데이터 타입

Scala 함수 또는 람다에 대한 UDF를 만들려면 함수 또는 람다의 인자 및 반환 값에 대해 아래 나열된 지원되는 데이터 타입을 사용해야 합니다.

SQL 데이터 타입

Scala 데이터 타입

참고

NUMBER

다음 타입이 지원됩니다.

  • Short 또는 Option[Short]

  • Int 또는 Option[Int]

  • Long 또는 Option[Long]

  • java.math.BigDecimal

FLOAT

Float 또는 Option[Float]

DOUBLE

Double 또는 Option[Double]

VARCHAR

String 또는 java.lang.String

BOOLEAN

Boolean 또는 Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] 또는 Array[Variant]

OBJECT

Map[String, String] 또는 Map[String, Variant]

다음 타입의 변경 가능한 맵이 지원됩니다.

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

com.snowflake.snowpark.types.Geography

App 특성을 사용하여 오브젝트에서 UDF를 만드는 것에 대한 주의 사항

Scala는 Scala 오브젝트를 실행 가능 프로그램으로 바꾸기 위해 확장할 수 있는 App 특성을 제공합니다. App 특성은 오브젝트 정의의 본문에 있는 모든 코드를 자동으로 실행하는 main 메서드를 제공합니다. (오브젝트 정의의 코드가 효과적으로 main 메서드가 됩니다.)

App 특성 확장의 한 가지 효과는 main 메서드가 호출될 때까지 오브젝트의 필드가 초기화되지 않는다는 것입니다. 오브젝트가 App 을 확장하고, 사용자가 이전에 초기화한 오브젝트 필드를 사용하는 UDF를 정의하는 경우, 서버에 업로드된 UDF 정의에는 오브젝트 필드의 초기화된 값이 포함되지 않습니다.

예를 들어, 오브젝트에서 myConst 라는 필드를 정의 및 초기화하고 UDF에서 해당 필드를 사용한다고 가정합니다.

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

Snowpark가 UDF 정의를 직렬화하고 Snowflake에 업로드할 때 myConst 는 초기화되지 않고 null 로 확인됩니다. 결과적으로, UDF를 호출하면 myConst 에 대해 null 이 반환됩니다.

이 문제를 해결하려면 App 특성을 확장하지 않도록 오브젝트를 변경하고 코드에 대해 별도의 main 메서드를 구현하십시오.

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

UDF에 대한 종속성 지정하기

Snowpark API를 통해 UDF를 정의하려면 UDF가 의존하는 클래스 및 리소스(예: JAR 파일, 리소스 파일 등)가 포함된 모든 파일에 대해 Session.addDependency() 를 호출해야 합니다. (UDF에서 리소스를 읽는 것에 관한 자세한 내용은 UDF에서 파일 읽기 를 참조하십시오.)

Snowpark 라이브러리는 이러한 파일을 내부 스테이지에 업로드하고, UDF 실행 시 파일을 클래스 경로에 추가합니다.

애플리케이션을 실행할 때마다 라이브러리에서 파일을 업로드하지 않도록 하려면 파일을 스테이지에 업로드하십시오. addDependency 를 호출할 때 스테이지의 파일 경로를 전달하십시오.

Scala REPL을 사용하는 경우, REPL에 의해 생성된 클래스 디렉터리 를 종속성으로 추가해야 합니다. 예를 들어, run.sh 스크립트를 사용하여 REPL을 시작한 경우, 다음 메서드를 호출하십시오. 이는 스크립트에서 만든 repl_classes 디렉터리를 추가합니다.

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

다음 예는 스테이지에 JAR 파일을 종속성으로 추가하는 방법을 보여줍니다.

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

다음 예는 JAR 파일 및 리소스 파일에 대한 종속성을 추가하는 방법을 보여줍니다.

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

다음 종속성을 지정할 필요가 없습니다.

  • Scala 런타임 라이브러리.

    이러한 라이브러리는 UDF가 실행되는 서버의 런타임 환경에서 이미 사용 가능합니다.

  • Snowpark JAR 파일.

    Snowpark 라이브러리는 Snowpark JAR 파일을 자동으로 감지하여 서버에 업로드하려고 시도합니다.

    라이브러리가 Snowpark JAR 파일을 서버에 반복적으로 업로드하는 것을 방지하려면 다음을 수행하십시오.

    1. Snowpark JAR 파일을 스테이지에 업로드합니다.

      예를 들어, 다음 명령은 Snowpark JAR 파일을 @mystage 스테이지에 업로드합니다. PUT 명령은 JAR 파일을 압축하고 결과 파일의 이름을 snowpark-1.9.0.jar.gz로 지정합니다.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. addDependency 를 호출하여 스테이지의 Snowpark JAR 파일을 종속성으로 추가합니다.

      예를 들어, 이전 명령에서 업로드한 Snowpark JAR 파일을 추가하려면 다음을 수행하십시오.

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz")

      JAR 파일에 대한 지정된 경로에는 PUT 명령에 의해 추가된 .gz 파일 이름 확장자가 포함됩니다.

  • 현재 실행 중인 애플리케이션이 있는 JAR 파일 또는 디렉터리.

    Snowpark 라이브러리는 이러한 종속성을 자동으로 감지 및 업로드하려고 시도합니다.

    Snowpark 라이브러리가 이러한 종속성을 자동으로 감지할 수 없는 경우, 라이브러리는 오류를 보고하고 사용자는 addDependency 를 호출하여 이러한 종속성을 수동으로 추가해야 합니다.

종속성을 스테이지에 업로드하는 데 너무 오래 걸리는 경우 Snowpark 라이브러리는 시간 초과 예외를 보고합니다. Snowpark 라이브러리가 기다려야 하는 최대 시간을 구성하려면 세션 생성 시 snowpark_request_timeout_in_seconds 속성을 설정하십시오.

익명 UDF 만들기

익명 UDF를 만들려면 다음 중 하나를 수행할 수 있습니다.

  • com.snowflake.snowpark.functions 오브젝트에서 udf 함수를 호출하여 익명 함수의 정의를 전달합니다.

  • UDFRegistration 클래스에서 registerTemporary 메서드를 호출하여 익명 함수의 정의를 전달합니다. 익명 UDF를 등록하기 때문에 사용자는 name 매개 변수가 없는 메서드 서명을 사용해야 합니다.

참고

다중 스레드 코드를 쓸 때(예: 병렬 컬렉션을 사용하는 경우) udf 함수를 사용하는 대신 registerTemporary 메서드를 사용하여 UDF를 등록하십시오. 이렇게 하면 기본 Snowflake Session 오브젝트를 찾을 수 없는 오류를 방지할 수 있습니다.

이러한 메서드는 UDF를 호출하는 데 사용할 수 있는 UserDefinedFunction 오브젝트를 반환합니다. (스칼라 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.)

다음 예는 익명 UDF를 만듭니다.

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

참고

Jupyter 노트북에서 UDF를 만드는 경우, Snowpark와 함께 작동하도록 노트북을 설정하고(Snowpark Scala용 Jupyter 노트북 설정하기 참조) 노트북에서 UDF를 쓰기 위한 지침을 따라야 합니다(Jupyter 노트북에서 UDF 만들기 참조).

다음 예는 String 값의 Array 를 전달하고 각 값에 문자열 x 를 추가하는 익명의 UDF를 만듭니다.

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

다음 예에서는 사용자 지정 클래스(텍스트에 사용된 언어를 감지하는 LanguageDetector)를 사용하는 익명 UDF를 만듭니다. 이 예에서는 익명 UDF를 호출하여 DataFrame의 text_data 열에서 언어를 감지하고, 사용된 언어로 추가 lang 열을 포함하는 새 DataFrame을 만듭니다.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

명명된 UDF 만들기 및 등록

이름으로 UDF를 호출하려는 경우(예: functions 오브젝트에서 callUDF 함수를 사용하여 호출) 또는 후속 세션에서 UDF를 사용해야 하는 경우, 명명된 UDF를 만들고 등록할 수 있습니다. 이렇게 하려면 UDFRegistration 클래스에서 다음 메서드 중 하나를 사용하십시오.

  • registerTemporary: 그저 현재 세션에서 UDF를 사용하려는 경우

  • registerPermanent: 후속 세션에서 UDF를 사용하려는 경우

UDFRegistration 클래스의 오브젝트에 액세스하려면 Session 클래스의 udf 메서드를 호출하십시오.

registerTemporary 는 현재 세션에서 사용할 수 있는 임시 UDF를 만듭니다.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent 는 현재 및 후속 세션에서 사용할 수 있는 UDF를 만듭니다. registerPermanent 를 호출할 때 UDF 및 해당 종속성에 대한 JAR 파일이 업로드될 내부 스테이지 위치의 위치도 지정해야 합니다.

참고

registerPermanent 는 외부 스테이지를 지원하지 않습니다.

예:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

참고

Jupyter 노트북에서 UDF를 만드는 경우, Snowpark와 함께 작동하도록 노트북을 설정하고(Snowpark Scala용 Jupyter 노트북 설정하기 참조) 노트북에서 UDF를 쓰기 위한 지침을 따라야 합니다(Jupyter 노트북에서 UDF 만들기 참조).

Jupyter 노트북에서 UDF 만들기

Jupyter 노트북 에서 UDF를 만드는 경우 다음 추가 단계를 따라야 합니다.

UDF의 구현 쓰기

Serializable 을 확장하는 클래스에서 함수의 구현을 정의하십시오. 예:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

다른 셀에 정의된 변수에 액세스하기

UDF의 다른 셀에 정의된 변수를 사용해야 하는 경우, 해당 변수를 클래스 생성자에 인자로 전달해야 합니다. 예를 들어, 셀 1에서 변수를 정의했다고 가정합니다.

In [1]:
Copy
val prefix = "Hello"
Copy

사용자는 셀 2에 정의한 UDF에서 해당 변수를 사용하려고 합니다. UDF의 클래스 생성자에서 이 변수에 대한 인자를 추가하십시오. 그런 다음 클래스 생성자를 호출하여 UDF를 만들 때 셀 1에 정의된 변수를 전달하십시오.

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

직렬화할 수 없는 오브젝트 사용하기

람다 또는 함수에 대한 UDF를 만들 때 Snowpark 라이브러리는 람다 클로저를 직렬화하고 실행을 위해 서버로 보냅니다.

람다 클로저에 의해 캡처된 오브젝트를 직렬화할 수 없는 경우 Snowpark 라이브러리에서 java.io.NotSerializableException 예외가 발생합니다.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

이 경우 다음 중 하나를 수행할 수 있습니다.

  • 오브젝트를 직렬화 가능하게 만들거나

  • 오브젝트를 lazy val 로 선언하거나 @transient 주석을 사용하여 오브젝트 직렬화를 방지합니다.

    예:

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

UDF의 초기화 코드 쓰기

UDF에 초기화 코드 또는 컨텍스트가 필요한 경우, UDF 클로저의 일부로서 캡처된 값을 통해 이를 제공할 수 있습니다.

다음 예에서는 별도의 클래스를 사용하여 세 개의 UDF에 필요한 컨텍스트를 초기화합니다.

  • 첫 번째 UDF는 람다 내에서 클래스의 새 인스턴스를 만들므로 UDF가 호출될 때마다 초기화가 수행됩니다.

  • 두 번째 UDF는 클라이언트 프로그램에서 생성된 클래스의 인스턴스를 캡처합니다. 클라이언트에서 생성된 컨텍스트는 직렬화되고 UDF에서 사용됩니다. 이 접근 방식이 작동하려면 컨텍스트 클래스가 직렬화 가능해야 합니다.

  • 세 번째 UDF는 lazy val 을 캡처하므로 컨텍스트는 첫 번째 UDF 호출에서 지연 인스턴스화되고 후속 호출에서 재사용됩니다. 이 접근 방식은 컨텍스트가 직렬화할 수 없는 경우에도 작동합니다. 그러나 데이터 프레임 내의 모든 UDF 호출이 지연 생성된 동일 컨텍스트를 사용한다는 보장은 없습니다.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

UDF에서 파일 읽기

앞서 언급했듯이 Snowpark 라이브러리는 서버에서 UDF를 업로드하고 실행합니다. UDF가 파일에서 데이터를 읽어야 하는 경우, 파일이 UDF와 함께 업로드되었는지 확인해야 합니다.

또한, UDF에 대한 호출 사이에 파일 내용이 동일하게 유지되는 경우, 첫 번째 호출 중에 파일을 한 번 로딩하고 후속 호출에서는 로딩하지 않도록 코드를 쓸 수 있습니다. 이는 UDF 호출의 성능을 향상시킬 수 있습니다.

파일을 읽도록 UDF를 설정하려면 다음을 수행하십시오.

  1. 파일을 JAR 파일에 추가합니다.

    예를 들어, UDF가 data/ 하위 디렉터리(data/hello.txt)의 파일을 사용해야 하는 경우, jar 명령을 실행하여 이 파일을 JAR 파일에 추가하십시오.

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. JAR 파일이 종속성임을 지정합니다. 이는 서버에 파일을 업로드하고 클래스 경로에 파일을 추가합니다. UDF에 대한 종속성 지정하기 섹션을 참조하십시오.

    예:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. UDF에서 Class.getResourceAsStream 을 호출하여 클래스 경로에서 파일을 찾고 파일을 읽습니다.

    this 에 대한 종속성을 추가하지 않으려면 (getClass 대신) classOf[com.snowflake.snowpark.DataFrame] 을 사용하여 Class 오브젝트를 가져올 수 있습니다.

    예를 들어, data/hello.txt 파일을 읽으려면 다음을 수행하십시오.

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    이 예에서 리소스 이름은 / 로 시작하며 이는 이것이 JAR 파일에 있는 파일의 전체 경로임을 나타냅니다. (이 경우, 파일의 위치는 클래스의 패키지와 관련이 없습니다.)

참고

UDF 호출 사이에 파일 내용이 변경되지 않을 것으로 예상되는 경우, 파일을 lazy val 로 읽으십시오. 이로 인해 파일 로딩 코드는 UDF에 대한 첫 번째 호출에서만 실행되고 후속 호출에서는 실행되지 않습니다.

다음 예는 UDF(readFileFunc)로 사용될 함수로 오브젝트(UDFCode)를 정의합니다. 이 함수는 data/hello.txt 파일을 읽으며, 이는 문자열 hello, 를 포함할 것으로 예상됩니다. 함수는 이 문자열을 인자로 전달된 문자열 앞에 추가합니다.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

예의 다음 부분에서는 함수를 익명 UDF로 등록합니다. 이 예에서는 DataFrame의 NAME 열에서 UDF를 호출합니다. 이 예에서는 data/hello.txt 파일이 JAR 파일 myJar.jar 에 패키지되어 있다고 가정합니다.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

사용자 정의 테이블 함수(UDTF) 만들기

Snowpark에서 UDTF를 만들고 등록하려면 다음을 수행해야 합니다.

다음 섹션에서는 이러한 단계를 더 자세히 설명합니다.

UDTF 호출에 대한 정보는 UDTF 호출하기 섹션을 참조하십시오.

UDTF 클래스 정의하기

com.snowflake.snowpark.udtf 패키지UDTFn 클래스(예: UDTF0, UDTF1 등) 중 하나에서 상속하는 클래스를 정의합니다. 여기서 n 은 UDTF에 대한 입력 인자의 수를 지정합니다. 예를 들어, UDTF가 입력 인자를 2개 전달하는 경우 UDTF2 클래스를 확장합니다.

클래스에서 다음 메서드를 재정의합니다.

  • outputSchema(). 이는 반환된 행(출력의 《스키마》)에 있는 필드의 이름과 타입을 설명하는 types.StructType 오브젝트를 반환합니다.

  • process(). 이는 입력 파티션 의 각 행에 대해 한 번씩 호출됩니다(아래 노트 참조).

  • endPartition(). 이는 모든 행이 process() 에 전달된 후 각 파티션에 대해 한 번 호출됩니다.

UDTF가 호출되면 행은 UDTF에 전달되기 전에 파티션으로 그룹화됩니다.

  • UDTF를 호출하는 문이 PARTITION 절(명시적 파티션)을 지정하는 경우 해당 절은 행이 분할되는 방법을 결정합니다.

  • 문이 PARTITION 절(암시적 파티션)을 지정하지 않으면 Snowflake는 행을 가장 잘 분할하는 방법을 결정합니다.

파티션에 대한 설명은 테이블 함수 및 파티션 섹션을 참조하십시오.

UDTF 클래스의 예를 보려면 UDTF 클래스의 예 섹션을 참조하십시오.

outputSchema() 메서드 재정의

outputSchema() 메서드를 재정의하여, process()endPartition() 메서드에 의해 반환된 행의 필드 이름과 데이터 타입(《출력 스키마》)을 정의합니다.

def outputSchema(): StructType
Copy

이 메서드에서 StructField 오브젝트의 Array 를 사용하여 반환된 행에 있는 각 필드의 Snowflake 데이터 타입을 지정하는 StructType 오브젝트를 생성하고 반환합니다. Snowflake는 UDTF의 출력 스키마에 대해 다음 형식 오브젝트를 지원합니다.

SQL 데이터 타입

스칼라 타입

com.snowflake.snowpark.types 형식

NUMBER

Short 또는 Option[Short]

ShortType

NUMBER

Int 또는 Option[Int]

IntType

NUMBER

Long 또는 Option[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

Float 또는 Option[Float]

FloatType

DOUBLE

Double 또는 Option[Double]

DoubleType

VARCHAR

String 또는 java.lang.String

StringType

BOOLEAN

Boolean 또는 Option[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

com.snowflake.snowpark.types.Variant

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

예를 들어 UDTF는 단일 정수 필드가 있는 행을 반환합니다.

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

process() 메서드 재정의하기

UDTF 클래스에서 process() 메서드를 재정의합니다.

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

여기서 n 은 UDTF에 전달된 인자의 수입니다.

서명의 인자 수는 확장한 클래스에 해당합니다. 예를 들어, UDTF가 입력 인자 2개를 전달하고 사용자가 UDTF2 클래스를 확장하는 경우 process() 메서드에는 다음 서명이 있습니다.

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

이 메서드는 입력 파티션의 각 행에 대해 한 번씩 호출됩니다.

인자 타입 선택

process() 메서드의 각 인자 타입은 UDTF에 전달된 인자의 Snowflake 데이터 타입에 해당하는 Scala 유형을 사용합니다.

Snowflake는 UDTF의 인자에 대해 다음 데이터 타입을 지원합니다.

SQL 데이터 타입

Scala 데이터 타입

참고

NUMBER

다음 타입이 지원됩니다.

  • Short 또는 Option[Short]

  • Int 또는 Option[Int]

  • Long 또는 Option[Long]

  • java.math.BigDecimal

FLOAT

Float 또는 Option[Float]

DOUBLE

Double 또는 Option[Double]

VARCHAR

String 또는 java.lang.String

BOOLEAN

Boolean 또는 Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] 또는 Array[Variant]

OBJECT

Map[String, String] 또는 Map[String, Variant]

다음 타입의 변경 가능한 맵이 지원됩니다.

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

행 반환하기

process() 메서드에서, 주어진 입력 값에 대해 UDTF가 반환할 데이터를 포함하는 Row 오브젝트의 Iterable 을 생성하고 반환합니다. 행의 필드는 outputSchema 메서드에서 지정한 타입을 사용해야 합니다. (outputSchema() 메서드 재정의 섹션을 참조하십시오.)

예를 들어 UDTF가 행을 생성하는 경우, 생성된 행에 대해 Row 오브젝트의 Iterable 을 생성하고 반환합니다.

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

endPartition() 메서드 재정의하기

endPartition 메서드를 재정의하고, 입력 파티션의 모든 행이 process 메서드에 전달된 후 실행되어야 하는 코드를 추가합니다. endPartition 메서드는 각 입력 파티션에 대해 한 번 호출됩니다.

def endPartition(): Iterable[Row]
Copy

파티션의 모든 행이 처리된 후 작업을 수행해야 하는 경우 이 메서드를 사용할 수 있습니다. 예를 들어 다음을 할 수 있습니다.

  • process 메서드 호출에서 캡처한 상태 정보를 기반으로 행을 반환합니다.

  • 특정 입력 행에 연결되지 않은 행을 반환합니다.

  • process 메서드에 의해 생성된 출력 행을 요약하는 행을 반환합니다.

반환하는 행의 필드는 outputSchema 메서드에서 지정한 타입과 일치해야 합니다. (outputSchema() 메서드 재정의 섹션을 참조하십시오.)

각 파티션의 끝에서 추가 행을 반환할 필요가 없으면 Row 오브젝트의 빈 Iterable 을 반환합니다. 예를 들면 다음과 같습니다.

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

참고

Snowflake는 성공적으로 처리하도록 시간 제한이 조정된 대형 파티션을 지원하지만, 특히 대형 파티션으로 인해 처리 시간이 초과될 수 있습니다(예: endPartition 이 완료하는 데 너무 오래 걸리는 경우). 특정 사용 시나리오에 맞게 시간 초과 임계값을 조정해야 하는 경우 Snowflake 지원 에 문의하십시오.

UDTF 클래스의 예

다음은 행 범위를 생성하는 UDTF 클래스의 예입니다.

  • UDTF는 인자를 2개 전달하기 때문에 클래스는 UDTF2 를 확장합니다.

  • 인자 startcount 는 행의 시작 번호 및 생성할 행 수를 지정합니다.

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

UDTF 등록하기

그런 다음 새 클래스의 인스턴스를 만들고 UDTFRegistration 메서드 중 하나를 호출하여 클래스를 등록합니다. 임시 또는 영구 UDTF 를 등록할 수 있습니다.

임시 UDTF 등록하기

임시 UDTF를 등록하려면 UDTFRegistration.registerTemporary 를 호출합니다.

  • UDTF를 이름으로 호출할 필요가 없으면 클래스의 인스턴스를 전달하여 익명의 UDTF를 등록할 수 있습니다.

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • UDTF를 이름으로 호출해야 하는 경우 UDTF의 이름도 전달합니다.

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

영구 UDTF 등록하기

후속 세션에서 UDTF를 사용해야 하는 경우 UDTFRegistration.registerPermanent 를 호출하여 영구 UDTF를 등록합니다.

영구 UDTF를 등록할 때 사용자는 등록 메서드가 UDTF 및 해당 종속성에 대한 JAR 파일을 업로드할 스테이지를 지정해야 합니다. 예:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

UDTF 호출하기

UDTF를 등록한 후, 반환된 TableFunction 오브젝트를 Session 오브젝트의 tableFunction 메서드에 전달하여 UDTF를 호출할 수 있습니다.

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

이름으로 UDTF를 호출하려면 해당 이름으로 TableFunction 오브젝트를 생성하고 tableFunction 메서드에 전달합니다.

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

SELECT 문을 통해 직접 UDTF를 호출할 수도 있습니다.

session.sql("select * from table(myUdtf(10, 5))")
Copy