Python에서 DataFrames용 사용자 정의 함수(UDAFs) 만들기

Snowpark Python APIs를 사용하여 사용자 정의 집계 함수(UDAFs)를 생성하고 호출할 수 있습니다. UDAF는 하나 이상의 행을 입력으로 받아 단일 행의 출력을 생성합니다. UDAF는 여러 행의 값을 연산하여 합계, 평균, 계산, 최소값 또는 최대값 찾기, 표준 편차, 추정과 같은 수학적 계산과 일부 비수학적 연산을 수행합니다.

Snowpark로 UDAF를 만들고 등록하려면 다음을 수행해야 합니다.

  • UDAF 핸들러를 구현합니다.

    이 핸들러는 UDAF의 논리를 포함합니다. UDAF 핸들러는 UDAF가 호출될 때 Snowflake가 런타임에 호출할 함수를 구현해야 합니다. 자세한 내용은 핸들러 구현하기 섹션을 참조하십시오.

  • Snowflake 데이터베이스에 UDAF와 해당 핸들러를 등록합니다.

    UDAF를 등록했으면 SQL에서 호출하거나 Snowpark API를 사용하여 호출할 수 있습니다. Snowpark API를 사용하여 UDAF와 해당 핸들러를 등록할 수 있습니다. 등록에 대한 자세한 내용은 UDAF 등록하기 섹션을 참조하십시오.

Python 사용자 정의 집계 함수 에 설명된 대로 SQL을 사용하여 자체 UDAFs를 생성할 수도 있습니다.

핸들러 구현하기

집계 함수 핸들러를 위한 인터페이스 의 설명과 같이, UDAF 핸들러 클래스는 UDAF가 호출될 때 Snowflake가 호출하는 메서드를 구현해야 합니다. Snowpark API에 UDAF를 등록하든 CREATE FUNCTION 문을 사용하여 SQL로 UDTF를 만들든 , 작성하는 클래스를 핸들러로 사용할 수 있습니다.

UDAF 핸들러 클래스는 다음 표에 나열된 메서드를 구현하며, 이 메서드는 런타임에 Snowflake가 호출합니다. 이 항목에 있는 예제 를 참조하십시오.

메서드

요구 사항

설명

__init__

필수

집계의 내부 상태를 초기화합니다.

aggregate_state

필수

집계의 내부 상태를 반환합니다.

  • 메서드에는 @property decorator 가 있어야 합니다.

  • 집계 상태 오브젝트는 Python 피클 라이브러리 에서 직렬화할 수 있는 모든 Python 데이터 타입이 될 수 있습니다.

  • 단순 집계 상태의 경우 기본 Python 데이터 타입을 사용합니다. 더 복잡한 집계 상태의 경우 Python 데이터 클래스 를 사용합니다.

accumulate

필수

새로운 입력 행을 기준으로 집계 상태를 누적합니다.

merge

필수

두 개의 중간 집계 상태를 결합합니다.

finish

필수

집계된 상태를 기반으로 최종 결과를 생성합니다.

UDAF 등록하기

UDAF 핸들러를 구현했으면 Snowpark API를 사용하여 Snowflake 데이터베이스에서 UDAF를 등록할 수 있습니다. UDAF를 등록하면 호출할 수 있도록 UDAF가 생성됩니다.

스칼라 UDF에 대해 등록할 수 있는 것처럼 UDAF를 명명된 함수나 익명의 함수로 등록할 수 있습니다. 스칼라 UDF 등록에 대한 관련 정보는 익명 UDF 만들기명명된 UDF 만들기 및 등록 섹션을 참조하십시오. UDAF를 등록할 때 Snowflake가 UDAF를 만드는 데 필요한 매개 변수 값을 지정합니다.

다음 함수와 메서드를 사용하여 함수를 등록할 수 있습니다.

반환 값과 단일 매개 변수가 있는 UDAF 만들기

다음 핸들러 예제의 Python 코드는 단일 정수 인자를 받아 행 전체에서 값을 더하고 결과를 반환하는 sum_int UDAF를 지원합니다.

함수 등록

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
  def __init__(self):
    # This aggregate state is a primitive Python data type.
    self._partial_sum = 0

  @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value):
    self._partial_sum += input_value

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

함수 호출

다음 예제의 Python 코드는 DataFrame을 사용하여 sum_int UDAF를 호출합니다.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Copy

반환 값과 두 개의 매개 변수가 있는 UDAF 만들기

함수 등록

다음 핸들러 예제의 Python 코드는 두 개의 정수 인자를 받아 행 전체에서 인자 값을 합산한 후 결과를 반환하는 sum_int UDAF를 지원합니다.

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
  class PythonSumUDAF:
    def __init__(self):
      self._partial_sum = 0

    @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value, input_value2):
    self._partial_sum += input_value + input_value2

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

함수 호출

다음 예제의 Python 코드는 DataFrame을 사용하여 sum_int UDAF를 호출합니다.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())
Copy