Java에서 DataFrame용 사용자 정의 함수(UDF) 만들기

Snowpark API는 Java의 람다 식에서 사용자 정의 함수를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 이러한 형식의 함수를 만드는 방법에 대해 설명합니다.

이 항목의 내용:

소개

Snowpark API를 호출하여 Java의 람다 식에 대한 사용자 정의 함수(UDF)를 생성할 수 있으며 이러한 UDF를 호출하여 DataFrame의 데이터를 처리할 수 있습니다.

Snowpark API를 사용하여 UDF를 만들면 Snowpark 라이브러리는 직렬화하여 UDF에 대한 코드를 스테이지에 업로드합니다. UDF를 호출하면 Snowpark 라이브러리는 데이터가 있는 서버에서 함수를 실행합니다. 결과적으로, 함수가 데이터를 처리하기 위해 데이터를 클라이언트로 전송할 필요가 없습니다.

사용자 지정 코드에서 사용자는 JAR 파일에 패키지된 코드(예: 서드 파티 라이브러리용 Java 클래스)를 호출할 수도 있습니다.

다음 두 가지 방법 중 하나로 사용자 지정 코드에 대한 UDF를 만들 수 있습니다.

  • 익명 UDF를 만들고 함수를 변수에 할당할 수 있습니다. 이 변수가 범위 내에 있는 한 이 변수를 사용하여 UDF를 호출할 수 있습니다.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • 명명된 UDF를 만들고 이름으로 UDF를 호출할 수 있습니다. 예를 들어, 이름으로 UDF를 호출해야 하거나 후속 세션에서 UDF를 사용해야 하는 경우 이를 사용할 수 있습니다.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

이 항목의 나머지 부분에서는 UDF를 만드는 방법에 대해 설명합니다.

참고

CREATE FUNCTION 명령을 실행하여 UDF를 정의한 경우, Snowpark에서 해당 UDF를 호출할 수 있습니다.

자세한 내용은 스칼라 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.

인자 및 반환 값에 지원되는 데이터 타입

Java 람다에 대한 UDF를 만들려면 메서드의 인자 및 반환 값에 대해 아래 나열된 지원되는 데이터 타입을 사용해야 합니다.

SQL 데이터 타입

Java 데이터 타입

참고

NUMBER

다음 타입이 지원됩니다.

  • Integer

  • Long

  • java.math.BigDecimal 또는 java.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] 또는 Variant[]

OBJECT

Map<문자열, 문자열> 또는 Map<문자열, 베리언트>

GEOGRAPHY

com.snowflake.snowpark_java.types.Geography

UDF에 대한 종속성 지정하기

Snowpark API를 통해 UDF를 정의하려면 UDF가 의존하는 클래스 및 리소스(예: JAR 파일, 리소스 파일 등)가 포함된 모든 파일에 대해 Session.addDependency() 를 호출해야 합니다. (UDF에서 리소스를 읽는 것에 관한 자세한 내용은 UDF에서 파일 읽기 를 참조하십시오.)

Snowpark 라이브러리는 이러한 파일을 내부 스테이지에 업로드하고, UDF 실행 시 파일을 클래스 경로에 추가합니다.

애플리케이션을 실행할 때마다 라이브러리에서 파일을 업로드하지 않도록 하려면 파일을 스테이지에 업로드하십시오. addDependency 를 호출할 때 스테이지의 파일 경로를 전달하십시오.

다음 예는 스테이지에 JAR 파일을 종속성으로 추가하는 방법을 보여줍니다.

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

다음 예는 JAR 파일 및 리소스 파일에 대한 종속성을 추가하는 방법을 보여줍니다.

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

다음 종속성을 지정할 필요가 없습니다.

  • Java 런타임 라이브러리.

    이러한 라이브러리는 UDF가 실행되는 서버의 런타임 환경에서 이미 사용 가능합니다.

  • Snowpark JAR 파일.

    Snowpark 라이브러리는 Snowpark JAR 파일을 자동으로 감지하여 서버에 업로드하려고 시도합니다.

    라이브러리가 Snowpark JAR 파일을 서버에 반복적으로 업로드하는 것을 방지하려면 다음을 수행하십시오.

    1. Snowpark JAR 파일을 스테이지에 업로드합니다.

      예를 들어, 다음 명령은 Snowpark JAR 파일을 @mystage 스테이지에 업로드합니다. PUT 명령은 JAR 파일을 압축하고 결과 파일의 이름을 snowpark-1.9.0.jar.gz로 지정합니다.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. addDependency 를 호출하여 스테이지의 Snowpark JAR 파일을 종속성으로 추가합니다.

      예를 들어, 이전 명령에서 업로드한 Snowpark JAR 파일을 추가하려면 다음을 수행하십시오.

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz");

      JAR 파일에 대한 지정된 경로에는 PUT 명령에 의해 추가된 .gz 파일 이름 확장자가 포함됩니다.

  • 현재 실행 중인 애플리케이션이 있는 JAR 파일 또는 디렉터리.

    Snowpark 라이브러리는 이러한 종속성을 자동으로 감지 및 업로드하려고 시도합니다.

    Snowpark 라이브러리가 이러한 종속성을 자동으로 감지할 수 없는 경우, 라이브러리는 오류를 보고하고 사용자는 addDependency 를 호출하여 이러한 종속성을 수동으로 추가해야 합니다.

종속성을 스테이지에 업로드하는 데 너무 오래 걸리는 경우 Snowpark 라이브러리는 시간 초과 예외를 보고합니다. Snowpark 라이브러리가 기다려야 하는 최대 시간을 구성하려면 세션 생성 시 snowpark_request_timeout_in_seconds 속성을 설정하십시오.

익명 UDF 만들기

익명 UDF를 만들려면 다음 중 하나를 수행할 수 있습니다.

  • Functions.udf 정적 메서드를 호출하고 입력 및 출력의 데이터 타입을 나타내는 람다 식과 DataTypes 필드(또는 해당 클래스의 메서드에 의해 생성된 오브젝트)를 전달합니다.

  • UDFRegistration 클래스에서 registerTemporary 메서드를 호출하여 입력 및 출력의 데이터 타입을 나타내는 람다 식 및 DataTypes 필드(또는 해당 클래스의 메서드에 의해 생성된 오브젝트)를 전달합니다.

    Session 오브젝트의 udf 메서드를 호출하여 UDFRegistration 클래스의 인스턴스에 액세스할 수 있습니다.

    registerTemporary 를 호출하는 경우, name 매개 변수가 없는 메서드 서명을 사용합니다. (익명 UDF를 만들기 때문에 UDF의 이름을 지정하지 않습니다.)

참고

다중 스레드 코드를 쓸 때(예: 병렬 컬렉션을 사용하는 경우) udf 메서드를 사용하는 대신 registerTemporary 메서드를 사용하여 UDF를 등록하십시오. 이렇게 하면 기본 Snowflake Session 오브젝트를 찾을 수 없는 오류를 방지할 수 있습니다.

이러한 메서드는 UDF를 호출하는 데 사용할 수 있는 UserDefinedFunction 오브젝트를 반환합니다. (스칼라 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.)

다음 예는 익명 UDF를 만듭니다.

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

다음 예에서는 사용자 지정 클래스(텍스트에 사용된 언어를 감지하는 LanguageDetector)를 사용하는 익명 UDF를 만듭니다. 이 예에서는 익명 UDF를 호출하여 DataFrame의 text_data 열에서 언어를 감지하고, 사용된 언어로 추가 lang 열을 포함하는 새 DataFrame을 만듭니다.

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

명명된 UDF 만들기 및 등록

이름으로 UDF를 호출하려는 경우(예: Functions.callUDF 정적 메서드 사용) 또는 후속 세션에서 UDF를 사용해야 하는 경우, 명명된 UDF를 만들고 등록할 수 있습니다. 이렇게 하려면 UDFRegistration 클래스에서 다음 메서드 중 하나를 사용하십시오.

  • registerTemporary: 그저 현재 세션에서 UDF를 사용하려는 경우

  • registerPermanent: 후속 세션에서 UDF를 사용하려는 경우

UDFRegistration 클래스의 오브젝트에 액세스하려면 Session 오브젝트의 udf 메서드를 호출하십시오.

registerTemporary 또는 registerPermanent 메서드를 호출할 때 입력 및 출력의 데이터 타입을 나타내는 람다 식 및 DataTypes 필드(또는 해당 클래스의 메서드에 의해 생성된 오브젝트)를 전달합니다.

예:

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent 는 현재 및 후속 세션에서 사용할 수 있는 UDF를 만듭니다. registerPermanent 를 호출할 때 UDF 및 해당 종속성에 대한 JAR 파일이 업로드될 내부 스테이지 위치의 위치도 지정해야 합니다.

참고

registerPermanent 는 외부 스테이지를 지원하지 않습니다.

예:

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

직렬화할 수 없는 오브젝트 사용하기

람다 식에 대한 UDF를 만들 때 Snowpark 라이브러리는 람다 클로저를 직렬화하고 실행을 위해 서버로 보냅니다.

람다 클로저에 의해 캡처된 오브젝트를 직렬화할 수 없는 경우 Snowpark 라이브러리에서 java.io.NotSerializableException 예외가 발생합니다.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

이 경우 오브젝트를 직렬화 가능하게 만들어야 합니다.

UDF의 초기화 코드 쓰기

UDF에 초기화 코드 또는 컨텍스트가 필요한 경우, UDF 클로저의 일부로서 캡처된 값을 통해 이를 제공할 수 있습니다.

다음 예에서는 별도의 클래스를 사용하여, 두 개의 UDF에 필요한 컨텍스트를 초기화합니다.

  • 첫 번째 UDF는 람다 내에서 클래스의 새 인스턴스를 만들므로 UDF가 호출될 때마다 초기화가 수행됩니다.

  • 두 번째 UDF는 클라이언트 프로그램에서 생성된 클래스의 인스턴스를 캡처합니다. 클라이언트에서 생성된 컨텍스트는 직렬화되고 UDF에서 사용됩니다. 이 접근 방식이 작동하려면 컨텍스트 클래스가 직렬화 가능해야 합니다.

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

UDF에서 파일 읽기

앞서 언급했듯이 Snowpark 라이브러리는 서버에서 UDF를 업로드하고 실행합니다. UDF가 파일에서 데이터를 읽어야 하는 경우, 파일이 UDF와 함께 업로드되었는지 확인해야 합니다.

또한, UDF에 대한 호출 사이에 파일 내용이 동일하게 유지되는 경우, 첫 번째 호출 중에 파일을 한 번 로딩하고 후속 호출에서는 로딩하지 않도록 코드를 쓸 수 있습니다. 이는 UDF 호출의 성능을 향상시킬 수 있습니다.

파일을 읽도록 UDF를 설정하려면 다음을 수행하십시오.

  1. 파일을 JAR 파일에 추가합니다.

    예를 들어, UDF가 data/ 하위 디렉터리(data/hello.txt)의 파일을 사용해야 하는 경우, jar 명령을 실행하여 이 파일을 JAR 파일에 추가하십시오.

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. JAR 파일이 종속성임을 지정합니다. 이는 서버에 파일을 업로드하고 클래스 경로에 파일을 추가합니다. UDF에 대한 종속성 지정하기 섹션을 참조하십시오.

    예:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. UDF에서 Class.forName().getResourceAsStream() 을 호출하여 클래스 경로에서 파일을 찾고 파일을 읽습니다.

    this 에 대한 종속성을 추가하지 않으려면 (getClass() 대신) Class.forName("com.snowflake.snowpark_java.DataFrame") 을 사용하여 Class 오브젝트를 가져올 수 있습니다.

    예를 들어, data/hello.txt 파일을 읽으려면 다음을 수행하십시오.

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    이 예에서 리소스 이름은 / 로 시작하며 이는 이것이 JAR 파일에 있는 파일의 전체 경로임을 나타냅니다. (이 경우, 파일의 위치는 클래스의 패키지와 관련이 없습니다.)

참고

UDF 호출 사이에 파일 내용이 변경되지 않을 것으로 예상되면 파일을 클래스의 정적 필드로 읽고, 필드가 설정되지 않은 경우에만 파일을 읽으십시오.

다음 예는 UDF(readFileFunc)로 사용될 함수로 오브젝트(UDFCode)를 정의합니다. 이 함수는 data/hello.txt 파일을 읽으며, 이는 문자열 hello, 를 포함할 것으로 예상됩니다. 함수는 이 문자열을 인자로 전달된 문자열 앞에 추가합니다.

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

예의 다음 부분에서는 함수를 익명 UDF로 등록합니다. 이 예에서는 DataFrame의 NAME 열에서 UDF를 호출합니다. 이 예에서는 data/hello.txt 파일이 JAR 파일 myJar.jar 에 패키지되어 있다고 가정합니다.

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

사용자 정의 테이블 함수(UDTF) 만들기

Snowpark에서 UDTF를 만들고 등록하려면 다음을 수행해야 합니다.

다음 섹션에서는 이러한 단계를 더 자세히 설명합니다.

UDTF 호출에 대한 정보는 UDTF 호출하기 섹션을 참조하십시오.

UDTF 클래스 정의하기

com.snowflake.snowpark_java.udtf package 에서 JavaUDTFn 개의 인터페이스 중 하나(예: JavaUDTF0, JavaUDTF1 등)를 구현하는 클래스를 정의합니다. 여기서 n 은 UDTF에 대한 입력 인자의 수를 지정합니다. 예를 들어, UDTF가 입력 인자를 2개 전달하는 경우 JavaUDTF2 인터페이스를 구현하십시오.

클래스에서 다음 메서드를 구현합니다.

  • outputSchema(). 이는 반환된 행(출력의 《스키마》)에 있는 필드의 이름과 타입을 설명하는 types.StructType 오브젝트를 반환합니다.

  • process(). 이는 입력 파티션 의 각 행에 대해 한 번씩 호출됩니다(아래 노트 참조).

  • inputSchema(). 이는 입력 매개 변수의 유형을 설명하는 types.StructType 오브젝트를 반환합니다.

    process() 메서드가 Map 인자를 전달하는 경우 inputSchema() 메서드를 구현해야 합니다. 그렇지 않으면 이 메서드를 구현하는 것은 선택 사항입니다.

  • endPartition(). 이는 모든 행이 process() 에 전달된 후 각 파티션에 대해 한 번 호출됩니다.

UDTF가 호출되면 행은 UDTF에 전달되기 전에 파티션으로 그룹화됩니다.

  • UDTF를 호출하는 문이 PARTITION 절(명시적 파티션)을 지정하는 경우 해당 절은 행이 분할되는 방법을 결정합니다.

  • 문이 PARTITION 절(암시적 파티션)을 지정하지 않으면 Snowflake는 행을 가장 잘 분할하는 방법을 결정합니다.

파티션에 대한 설명은 테이블 함수 및 파티션 섹션을 참조하십시오.

UDTF 클래스의 예를 보려면 UDTF 클래스의 예 섹션을 참조하십시오.

outputSchema() 메서드 구현하기

outputSchema() 메서드를 구현하여, process()endPartition() 메서드에 의해 반환된 행의 필드 이름과 데이터 타입(《출력 스키마》)을 정의합니다.

public StructType outputSchema()
Copy

이 메서드에서는 반환된 행에 있는 각 필드의 Snowflake 데이터 타입을 나타내는 StructField 오브젝트를 포함하는 StructType 오브젝트를 생성하고 반환합니다. Snowflake는 UDTF의 출력 스키마에 대해 다음 형식 오브젝트를 지원합니다.

SQL 데이터 타입

Java 형식

com.snowflake.snowpark_java.types 형식

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<문자열, 문자열>

MapType(StringType, StringType)

OBJECT

java.util.Map<문자열, 베리언트>

MapType(StringType, VariantType)

예를 들어 UDTF는 단일 정수 필드가 있는 행을 반환합니다.

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

process() 메서드 구현하기

UDTF 클래스에서 process() 메서드를 구현합니다.

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

여기서 n 은 UDTF에 전달된 인자의 수입니다.

서명의 인자 수는 구현한 인터페이스에 해당합니다. 예를 들어, UDTF가 입력 인자 2개를 전달하고 사용자가 JavaUDTF2 인터페이스를 구현하는 경우 process() 메서드에는 다음 서명이 있습니다.

Stream<Row> process(A0 arg0, A1 arg1)
Copy

이 메서드는 입력 파티션의 각 행에 대해 한 번씩 호출됩니다.

인자 타입 선택

process() 메서드의 각 인자 타입은 UDTF에 전달된 인자의 Snowflake 데이터 타입에 해당하는 Java 유형을 사용합니다.

Snowflake는 UDTF의 인자에 대해 다음 데이터 타입을 지원합니다.

SQL 데이터 타입

Java 데이터 타입

참고

NUMBER

다음 타입이 지원됩니다.

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] 또는 Variant[]

OBJECT

Map<문자열, 문자열> 또는 Map<문자열, 베리언트>

참고

java.util.Map 인자를 전달하는 경우 inputSchema 메서드를 구현하여 해당 인자의 유형을 설명해야 합니다. inputSchema() 메서드 구현하기 섹션을 참조하십시오.

행 반환하기

process() 메서드에서, 주어진 입력 값에 대해 UDTF가 반환할 데이터를 포함하는 Row 오브젝트의 java.util.stream.Stream 을 빌드하고 반환합니다. 행의 필드는 outputSchema 메서드에서 지정한 타입을 사용해야 합니다. (outputSchema() 메서드 구현하기 섹션을 참조하십시오.)

예를 들어 UDTF가 행을 생성하는 경우, 생성된 행에 대해 Row 오브젝트의 Iterable 을 생성하고 반환합니다.

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

inputSchema() 메서드 구현하기

process() 메서드가 java.util.Map 인자를 전달하는 경우 inputSchema() 메서드를 구현하여 입력 인자의 유형을 설명해야 합니다.

참고

process() 메서드가 Map 인자를 전달하지 않으면 inputSchema() 메서드를 구현할 필요가 없습니다.

이 메서드에서는 process() 메서드로 전달된 각 인자의 Snowflake 데이터 타입을 나타내는 StructField 오브젝트를 포함하는 StructType 오브젝트를 생성하고 반환합니다. Snowflake는 UDTF의 입력 스키마에 대해 다음 형식 오브젝트를 지원합니다.

SQL 데이터 타입

Java 형식

com.snowflake.snowpark_java.types 형식

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<문자열, 문자열>

MapType(StringType, StringType)

OBJECT

java.util.Map<문자열, 베리언트>

MapType(StringType, VariantType)

예를 들어 process() 메서드가 Map<문자열, 문자열> 인자와 Map<문자열, 베리언트> 인자를 전달한다고 가정합니다.

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

다음 입력 인자의 유형을 설명하는 StructType 오브젝트를 반환하는 inputSchema() 메서드를 구현해야 합니다.

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

endPartition() 메서드 구현하기

endPartition 메서드를 구현하고, 입력 파티션의 모든 행이 process 메서드에 전달된 후 실행되어야 하는 코드를 추가합니다. endPartition 메서드는 각 입력 파티션에 대해 한 번 호출됩니다.

public Stream<Row> endPartition()
Copy

파티션의 모든 행이 처리된 후 작업을 수행해야 하는 경우 이 메서드를 사용할 수 있습니다. 예를 들어 다음을 할 수 있습니다.

  • process 메서드 호출에서 캡처한 상태 정보를 기반으로 행을 반환합니다.

  • 특정 입력 행에 연결되지 않은 행을 반환합니다.

  • process 메서드에 의해 생성된 출력 행을 요약하는 행을 반환합니다.

반환하는 행의 필드는 outputSchema 메서드에서 지정한 타입과 일치해야 합니다. (outputSchema() 메서드 구현하기 섹션을 참조하십시오.)

각 파티션의 끝에서 추가 행을 반환할 필요가 없으면 빈 Stream 을 반환합니다. 예:

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

참고

Snowflake는 성공적으로 처리하도록 시간 제한이 조정된 대형 파티션을 지원하지만, 특히 대형 파티션으로 인해 처리 시간이 초과될 수 있습니다(예: endPartition 이 완료하는 데 너무 오래 걸리는 경우). 특정 사용 시나리오에 맞게 시간 초과 임계값을 조정해야 하는 경우 Snowflake 지원 에 문의하십시오.

UDTF 클래스의 예

다음은 행 범위를 생성하는 UDTF 클래스의 예입니다.

  • UDTF는 인자를 2개 전달하기 때문에 클래스는 JavaUDTF2 를 구현합니다.

  • 인자 startcount 는 행의 시작 번호 및 생성할 행 수를 지정합니다.

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

UDTF 등록하기

그런 다음 새 클래스의 인스턴스를 만들고 UDTFRegistration 메서드 중 하나를 호출하여 클래스를 등록합니다. 임시 또는 영구 UDTF 를 등록할 수 있습니다.

임시 UDTF 등록하기

임시 UDTF를 등록하려면 UDTFRegistration.registerTemporary 를 호출합니다.

  • UDTF를 이름으로 호출할 필요가 없으면 클래스의 인스턴스를 전달하여 익명의 UDTF를 등록할 수 있습니다.

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • UDTF를 이름으로 호출해야 하는 경우 UDTF의 이름도 전달합니다.

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

영구 UDTF 등록하기

후속 세션에서 UDTF를 사용해야 하는 경우 UDTFRegistration.registerPermanent 를 호출하여 영구 UDTF를 등록합니다.

영구 UDTF를 등록할 때 사용자는 등록 메서드가 UDTF 및 해당 종속성에 대한 JAR 파일을 업로드할 스테이지를 지정해야 합니다. 예:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

UDTF 호출하기

UDTF를 등록한 후, 반환된 TableFunction 오브젝트를 Session 오브젝트의 tableFunction 메서드에 전달하여 UDTF를 호출할 수 있습니다.

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

이름으로 UDTF를 호출하려면 해당 이름으로 TableFunction 오브젝트를 생성하고 tableFunction 메서드에 전달합니다.

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

SELECT 문을 통해 직접 UDTF를 호출할 수도 있습니다.

session.sql("select * from table(myUdtf(10, 5))");
Copy