Python에서 DataFrame용 사용자 정의 테이블 함수(UDTF) 만들기

Snowpark API는 Python으로 작성된 핸들러를 사용하여 사용자 정의 테이블 함수를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 이러한 형식의 함수를 만드는 방법에 대해 설명합니다.

이 항목의 내용:

소개

Snowpark API를 사용하여 사용자 정의 테이블 함수(UDTF)를 만들 수 있습니다.

Python에서 DataFrame용 사용자 정의 함수(UDF) 만들기 에 설명된 것처럼, 이 API를 사용하여 스칼라 사용자 정의 함수(UDF)를 만드는 것과 유사한 방식으로 이 작업을 수행합니다. 주요 차이점에는 UDF 핸들러 요구 사항과 UDTF 등록 시 필요한 매개 변수 값이 포함됩니다.

Snowpark로 UDTF를 만들고 등록하려면 다음을 수행해야 합니다.

  • UDTF 핸들러를 구현합니다.

    이 핸들러는 UDTF의 논리를 포함합니다. UDTF 핸들러는 UDTF가 호출될 때 Snowflake가 런타임에 호출할 함수를 구현해야 합니다. 자세한 내용은 UDTF 핸들러 구현하기 섹션을 참조하십시오.

  • Snowflake 데이터베이스에 UDTF와 해당 핸들러를 등록합니다.

    Snowpark API를 사용하여 UDTF와 해당 핸들러를 등록할 수 있습니다. UDTF를 등록했으면 SQL에서 호출하거나 Snowpark API를 사용하여 호출할 수 있습니다. 등록에 대한 자세한 내용은 UDTF 등록하기 섹션을 참조하십시오.

UDTF 호출에 대한 정보는 사용자 정의 테이블 함수(UDTF) 호출하기 섹션을 참조하십시오.

UDTF 핸들러 구현하기

Python으로 UDTF 작성하기 에 자세히 설명되어 있듯이, UDTF 핸들러 클래스는 UDTF가 호출될 때 Snowflake가 호출하는 메서드를 구현해야 합니다. Snowpark API에 UDTF를 등록하든 CREATE FUNCTION 문을 사용하여 SQL로 UDTF를 만들든, 작성하는 클래스를 핸들러로 사용할 수 있습니다.

핸들러 클래스의 메서드는 UDTF가 수신한 행과 파티션을 처리하도록 설계되었습니다.

UDTF 핸들러 클래스는 다음을 구현하는데, Snowflake가 런타임에 호출합니다.

  • __init__ 메서드. 선택 사항입니다. 입력 파티션의 상태 저장 처리를 초기화하기 위해 호출됩니다.

  • process 메서드. 필수 항목입니다. 각 입력 행에 대해 호출됩니다. 이 메서드는 테이블 형식 값을 튜플로 반환합니다.

  • end_partition 메서드. 선택 사항입니다. 입력 파티션의 처리를 마무리하기 위해 호출됩니다.

    Snowflake는 성공적으로 처리하도록 시간 제한이 조정된 대형 파티션을 지원하지만, 특히 대형 파티션으로 인해 처리 시간이 초과될 수 있습니다(예: end_partition 이 완료하는 데 너무 오래 걸리는 경우). 특정 사용 시나리오에 맞게 시간 초과 임계값을 조정해야 하는 경우 Snowflake 지원 에 문의하십시오.

핸들러 세부 정보와 예는 Python으로 UDTF 작성하기 섹션을 참조하십시오.

UDTF 등록하기

UDTF 핸들러를 구현했으면 Snowpark API를 사용하여 Snowflake 데이터베이스에서 UDTF를 등록할 수 있습니다. UDTF를 등록하면 호출할 수 있도록 UDTF가 생성됩니다.

스칼라 UDF에 대해 등록할 수 있는 것처럼 UDTF를 명명된 함수나 익명의 함수로 등록할 수 있습니다. 스칼라 UDF 등록에 대한 관련 정보는 익명 UDF 만들기명명된 UDF 만들기 및 등록 섹션을 참조하십시오.

UDTF를 등록할 때 Snowflake가 UDTF를 만드는 데 필요한 매개 변수 값을 지정합니다. (이들 매개 변수 중 다수는 SQL에서 기능적으로 CREATE FUNCTION 문의 절에 해당합니다. 자세한 내용은 CREATE FUNCTION 섹션을 참조하십시오.)

이러한 매개 변수는 대부분 스칼라 UDF를 만들 때 지정하는 매개 변수와 동일합니다(자세한 내용은 Python에서 DataFrame용 사용자 정의 함수(UDF) 만들기 참조). 주요 차이점은 UDTF가 테이블 형식 값을 반환한다는 사실과 해당 핸들러가 함수가 아니라 클래스라는 사실로 인해 발생합니다. 매개 변수의 전체 목록은 아래에 링크로 연결된 API에 대한 설명서를 참조하십시오.

Snowpark에 UDTF를 등록하려면 다음 중 하나를 사용하여 데이터베이스에서 UDTF를 만드는 데 필요한 매개 변수 값을 지정하십시오. 이러한 옵션을 구별해주는 정보에 대해서는 스칼라 UDF 등록을 위한 유사한 옵션을 설명하는 UDFRegistration 섹션을 참조하십시오.

UDTF의 입력 유형 및 출력 스키마 정의하기

UDTF를 등록할 때 함수의 매개 변수와 출력 값에 대한 세부 정보를 지정합니다. 함수 자체가 함수의 기본 핸들러에 대한 타입에 정확하게 상응하는 타입을 선언하도록 이 작업을 수행합니다.

예를 들어, 이 항목과 snowflake.snowpark.udtf.UDTFRegistration 참조 항목의 섹션을 참조하십시오.

UDTF는 등록 시에 그에 대한 다음 사항을 지정합니다.

  • 등록하는 함수의 input_types 매개 변수의 값으로 사용하는 입력 매개 변수의 유형. process 메서드 선언에서 유형에 대한 힌트를 제공하는 경우 input_types 매개 변수는 선택 사항입니다.

    이 값을 snowflake.snowpark.types.DataType 을 기반으로 하는 유형 목록으로 지정합니다. 예를 들어 input_types=[StringType(), IntegerType()] 을 지정할 수 있습니다.

  • 등록하는 함수의 output_schema 매개 변수의 값으로 사용하는 테이블 형식 출력의 스키마.

    output_schema 값은 다음 중 하나일 수 있습니다.

    • UDTF의 반환 값에 있는 열 이름의 목록.

      이 목록은 열 이름만 포함하므로, process 메서드 선언에 유형에 대한 힌트도 제공해야 합니다.

    • 출력 테이블의 열 이름 유형을 나타내는 StructType.

      다음 예제의 코드는 output 변수에 대한 값으로 스키마를 할당한 다음, UDTF를 등록할 때 변수를 사용합니다.

      >>> from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
      >>> from snowflake.snowpark.functions import udtf, table_function
      >>> schema = StructType([
      ...     StructField("symbol", StringType())
      ...     StructField("cost", IntegerType()),
      ... ])
      >>> @udtf(output_schema=schema,input_types=[StringType(), IntegerType(), FloatType()],stage_location="straut_udf",is_permanent=True,name="test_udtf",replace=True)
      ... class StockSale:
      ...     def process(self, symbol, quantity, price):
      ...         cost = quantity * price
      ...         yield (symbol, cost)
      
      Copy

다음은 간략한 예제 목록입니다. 더 많은 예제는 snowflake.snowpark.udtf.UDTFRegistration 섹션을 참조하십시오.

udtf 함수로 UDTF 등록하기

함수를 등록합니다.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
Copy

함수를 호출합니다.

>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

레지스터 함수로 UDTF 등록하기

함수를 등록합니다.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self):
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
Copy

함수를 호출합니다.

>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy

register_from_file 함수로 UDTF 등록하기

함수를 등록합니다.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
Copy

함수를 호출합니다.

>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy