Snowpark Java에서 함수 및 저장 프로시저 호출하기

DataFrame에서 데이터를 처리하기 위해 시스템 정의 SQL 함수, 사용자 정의 함수, 저장 프로시저를 호출할 수 있습니다. 이 항목에서는 Snowpark에서 이를 호출하는 방법에 대해 설명합니다.

이 항목의 내용:

시스템 정의 함수 호출하기

시스템 정의 SQL 함수 를 호출해야 하는 경우, Functions 클래스 에서 이와 동일한 정적 메서드를 사용하십시오.

다음 예에서는 Functions 클래스의 upper 정적 메서드(시스템 정의 UPPER 함수와 동일)를 호출하여, 대문자로 된 이름 열의 값을 반환합니다.

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

Functions 클래스에서 시스템 정의 SQL 함수를 사용할 수 없는 경우, Functions.callUDF 정적 메서드를 사용하여 시스템 정의 함수를 호출할 수 있습니다.

callUDF 의 경우, 시스템 정의 함수의 이름을 첫 번째 인자로 전달합니다. 열 값을 시스템 정의 함수에 전달해야 하는 경우, Column 오브젝트를 정의하고 callUDF 메서드에 대한 추가 인자로서 전달하십시오.

다음 예에서는 시스템 정의 함수 RADIANS 를 호출하여 degrees 열의 값을 전달합니다.

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

callUDF 메서드는 DataFrame 변환 메서드 (예: filter, select 등)에 전달할 수 있는 Column 을 반환합니다.

스칼라 사용자 정의 함수(UDF) 호출하기

UDF를 호출하는 방법은 UDF가 생성된 방식에 따라 다릅니다.

  • 익명 UDF 를 호출하려면 UDF 생성 시 반환된 UserDefinedFunction 오브젝트의 apply 메서드를 호출합니다.

    UDF에 전달하는 인자는 Column 오브젝트여야 합니다. 리터럴을 전달해야 하는 경우 리터럴을 열 오브젝트로 사용하기 에 설명된 대로 Functions.lit() 를 사용하십시오.

  • 이름으로 등록한 UDF, 그리고 CREATE FUNCTION 을 실행하여 만든 UDF를 호출하려면 Functions.callUDF 정적 메서드를 사용하십시오.

    UDF의 이름을 첫 번째 인자로서 전달하고 모든 UDF 매개 변수를 추가 인자로서 전달하십시오.

UDF를 호출하면 UDF의 반환 값이 포함된 Column 오브젝트가 반환됩니다.

다음 예에서는 UDF 함수 doubleUdf 을 호출하여 quantity 열의 값을 전달합니다. 이 예에서는 doubleUdf 의 반환 값을 DataFrame의 select 메서드로 전달합니다.

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

테이블 함수(시스템 함수 및 UDTF) 호출하기

테이블 함수 또는 사용자 정의 테이블 함수(UDTF) 를 호출하려면 다음을 수행합니다.

  1. 테이블 함수의 이름을 전달하여 TableFunction 오브젝트를 생성합니다.

  2. Session 오브젝트의 tableFunction 메서드 를 호출하고 TableFunction 오브젝트와 입력 인자 이름 및 값의 Map 을 전달합니다.

table?Function 은 테이블 함수의 출력을 포함하는 DataFrame을 반환합니다.

예를 들어 다음 명령을 실행하여 SQL UDTF를 생성했다고 가정합니다.

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

다음 코드는 이 UDTF를 호출하고, UDTF의 출력을 위한 DataFrame을 생성합니다. 이 예에서는 콘솔에 출력의 처음 10개 행을 출력합니다.

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

테이블 함수의 출력을 DataFrame과 조인해야 하는 경우, TableFunction을 전달하는 join 메서드 를 호출합니다.

저장 프로시저 호출하기

(Snowflake 환경에서) 서버 측 또는 로컬에서 프로시저를 실행할 수 있습니다. 두 환경이 다르므로 프로시저 실행의 조건과 결과도 두 환경에서 서로 다를 수 있다는 점을 명심하십시오.

다음 방식 중 하나로 Snowpark API를 사용하여 프로시저를 호출할 수 있습니다.

  • SProcRegistration.runLocally 메서드를 사용하여 테스트 및 디버깅을 위해 로컬에서 함수를 실행합니다.

  • Session.storedProcedure 메서드 중 하나를 사용하여 서버 측 Snowflake 환경에서 프로시저를 실행합니다. 여기에는 현재 세션으로 범위가 지정된 프로시저 또는 Snowflake에 저장된 영구 프로시저가 포함됩니다.

SQL 코드에서 Snowpark API로 생성한 영구 저장 프로시저를 호출할 수도 있습니다. 자세한 내용은 저장 프로시저 호출하기 섹션을 참조하십시오.

Snowpark API로 프로시저를 생성하는 방법에 대한 자세한 내용은 Java로 DataFrames용 저장 프로시저 만들기 섹션을 참조하십시오.

프로시저의 논리를 로컬에서 실행하기

SProcRegistration.runLocally 메서드를 사용하여 로컬 환경에서 프로시저에 대한 람다 함수를 실행할 수 있습니다. 메서드는 함수를 실행하고 그 결과를 함수에서 반환되는 형식으로 반환합니다.

예를 들어 Snowflake에서는 프로시저에서 사용하려는 람다 함수를 호출한 후에 그 함수에서 프로시저를 등록할 수 있습니다. 먼저 람다 코드를 유형이 com.snowflake.snowpark_java.sproc.JavaSProc 인터페이스 중 하나인 변수에 값으로 할당합니다. 해당 변수를 사용하여 SProcRegistration.runLocally 메서드로 함수 호출을 테스트할 수 있습니다. 프로시저를 등록할 때 변수를 사용하여 함수를 나타낼 수도 있습니다.

다음 예제의 코드는 프로시저의 논리가 될 람다 함수에서 JavaSProc 변수를 초기화합니다. 그런 다음 함수의 인자와 함께 변수를 SProcRegistration.runLocally 메서드에 전달하여 함수를 테스트합니다. 변수는 함수를 등록하는 데도 사용됩니다.

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

서버에서 프로시저 실행하기

서버의 Snowflake 환경에서 프로시저를 실행하려면 Session.storedProcedure 메서드를 사용하십시오. 이 메서드는 DataFrame 오브젝트를 반환합니다.

예를 들어 다음을 실행할 수 있습니다.

다음 예제의 코드는 서버에서 실행되도록 설계된 임시 프로시저를 생성하지만, 현재 Snowpark 세션 동안만 지속됩니다. 그런 다음 프로시저의 이름과 이를 나타내는 com.snowflake.snowpark_java.StoredProcedure 변수를 모두 사용하여 프로시저를 실행합니다.

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy