Snowpark Scala에서 함수 및 저장 프로시저 호출하기

DataFrame에서 데이터를 처리하기 위해 시스템 정의 SQL 함수, 사용자 정의 함수, 저장 프로시저를 호출할 수 있습니다. 이 항목에서는 Snowpark에서 이를 호출하는 방법에 대해 설명합니다.

이 항목의 내용:

시스템 정의 함수 호출하기

시스템 정의 SQL 함수 를 호출해야 하는 경우, com.snowflake.snowpark.functions 오브젝트 에서 이와 동일한 함수를 사용하십시오.

다음 예에서는 functions 오브젝트의 upper 함수(시스템 정의 UPPER 함수와 동일)를 호출하여 대문자로 된 이름 열의 값을 반환합니다.

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

함수 오브젝트에서 시스템 정의 SQL 함수를 사용할 수 없는 경우, 다음 접근 방식 중 하나를 사용할 수 있습니다.

  • callBuiltin 함수를 사용하여 시스템 정의 함수를 호출합니다.

  • builtin 함수를 사용하여 시스템 정의 함수를 호출하는 데 사용할 수 있는 함수 오브젝트를 만듭니다.

callBuiltinbuiltincom.snowflake.snowpark.functions 오브젝트에 정의되어 있습니다.

callBuiltin 의 경우, 시스템 정의 함수의 이름을 첫 번째 인자로 전달합니다. 열 값을 시스템 정의 함수에 전달해야 하는 경우, Column 오브젝트를 정의하고 callBuiltin 함수에 대한 추가 인자로서 전달하십시오.

다음 예에서는 시스템 정의 함수 RADIANS 를 호출하여 col1 열의 값을 전달합니다.

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

callBuiltin 함수는 DataFrame 변환 메서드 (예: filter, select 등)에 전달할 수 있는 Column 을 반환합니다.

builtin 의 경우, 시스템 정의 함수의 이름을 전달하고, 반환된 함수 오브젝트를 사용하여 시스템 정의 함수를 호출합니다. 예:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

스칼라 사용자 정의 함수(UDF) 호출하기

UDF를 호출하는 방법은 UDF가 생성된 방식에 따라 다릅니다.

  • 익명 UDF 를 호출하려면 UDF 생성 시 반환된 UserDefinedFunction 오브젝트의 apply 메서드를 호출합니다.

    UDF에 전달하는 인자는 Column 오브젝트여야 합니다. 리터럴을 전달해야 하는 경우 리터럴을 열 오브젝트로 사용하기 에 설명된 대로 lit() 를 사용하십시오.

  • 이름으로 등록한 UDF, 그리고 CREATE FUNCTION 을 실행하여 만든 UDF를 호출하려면 com.snowflake.snowpark.functions 오브젝트에서 callUDF 함수를 사용하십시오.

    UDF의 이름을 첫 번째 인자로서 전달하고 모든 UDF 매개 변수를 추가 인자로서 전달하십시오.

UDF를 호출하면 UDF의 반환 값이 포함된 Column 오브젝트가 반환됩니다.

다음 예에서는 UDF 함수 myFunction 을 호출하여 col1col2 열의 값을 전달합니다. 이 예에서는 myFunction 의 반환 값을 DataFrame의 select 메서드로 전달합니다.

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

테이블 함수(시스템 함수 및 UDTF) 호출하기

테이블 함수 또는 사용자 정의 테이블 함수(UDTF) 를 호출하려면 다음을 수행합니다.

  1. 테이블 함수의 이름을 전달하여 TableFunction 오브젝트를 생성합니다.

    Snowpark에서 UDTF를 만드는 경우, 단순히 UDTFRegistration.registerTemporary 또는 UDTFRegistration.registerPermanent 메서드에 의해 반환된 TableFunction 오브젝트를 사용할 수 있습니다. 사용자 정의 테이블 함수(UDTF) 만들기 섹션을 참조하십시오.

  2. session.tableFunction 을 호출하여 TableFunction 오브젝트와 입력 인자 이름 및 값의 Map 을 전달합니다.

table?Function 은 테이블 함수의 출력을 포함하는 DataFrame을 반환합니다.

예를 들어 다음 명령을 실행하여 SQL UDTF를 생성했다고 가정합니다.

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

다음 코드는 이 UDTF를 호출하고, UDTF의 출력을 위한 DataFrame을 생성합니다. 이 예에서는 콘솔에 출력의 처음 10개 행을 출력합니다.

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

테이블 함수의 출력을 DataFrame과 조인해야 하는 경우, TableFunction을 전달하는 DataFrame.join 메서드 를 호출합니다.

저장 프로시저 호출하기

(Snowflake 환경에서) 서버 측 또는 로컬에서 프로시저를 실행할 수 있습니다. 두 환경이 다르므로 프로시저 실행의 조건과 결과도 두 환경에서 서로 다를 수 있다는 점을 명심하십시오.

다음 방식 중 하나로 Snowpark API를 사용하여 프로시저를 호출할 수 있습니다.

  • SProcRegistration.runLocally 메서드를 사용하여 테스트 및 디버깅을 위해 로컬에서 함수를 실행합니다.

  • Session.storedProcedure 메서드를 사용하여 서버 측 Snowflake 환경에서 프로시저를 실행합니다. 여기에는 현재 세션으로 범위가 지정된 프로시저 또는 Snowflake에 저장된 영구 프로시저가 포함됩니다.

Snowflake 워크시트에서 Snowpark API로 생성한 영구 저장 프로시저를 호출할 수도 있습니다. 자세한 내용은 저장 프로시저 호출하기 섹션을 참조하십시오.

Snowpark API로 프로시저를 생성하는 방법에 대한 자세한 내용은 Scala로 DataFrames용 저장 프로시저 만들기 섹션을 참조하십시오.

프로시저의 논리를 로컬에서 실행하기

SProcRegistration.runLocally 메서드를 사용하여 로컬 환경에서 프로시저에 대한 람다 함수를 실행할 수 있습니다. 메서드는 함수를 실행하고 그 결과를 함수에서 반환되는 형식으로 반환합니다.

예를 들어 Snowflake에서는 프로시저에서 사용하려는 람다 함수를 클라이언트 측에서 로컬로 호출한 후에 그 함수에서 프로시저를 등록할 수 있습니다. 먼저 변수에 람다 코드를 값으로 할당합니다. 해당 변수를 SProcRegistration.runLocally 메서드에 전달하여 클라이언트 측에서 실행합니다. 프로시저를 등록할 때 변수를 사용하여 함수를 나타낼 수도 있습니다.

다음 예의 코드에서는 함수를 func 변수에 할당합니다. 그런 다음 함수의 인자 값과 함께 변수를 SProcRegistration.runLocally 메서드에 전달하여 함수를 로컬에서 테스트합니다. 변수는 프로시저를 등록하는 데도 사용됩니다.

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

서버에서 프로시저 실행하기

서버의 Snowflake 환경에서 프로시저를 실행하려면 Session.storedProcedure 메서드를 사용하십시오. 이 메서드는 DataFrame 오브젝트를 반환합니다.

예를 들어 다음을 실행할 수 있습니다.

다음 예제의 코드는 서버에서 실행되도록 설계된 임시 프로시저를 생성하지만, 현재 Snowpark 세션 동안만 지속됩니다. 그런 다음 프로시저의 이름과 이를 나타내는 StoredProcedure 변수를 모두 사용하여 프로시저를 실행합니다.

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy