Python에서 DataFrame용 사용자 정의 함수(UDF) 만들기

Snowpark API는 람다 또는 Python의 함수에서 사용자 정의 함수를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 이러한 형식의 함수를 만드는 방법에 대해 설명합니다.

이 항목의 내용:

소개

Snowpark를 사용하면 사용자 지정 람다 및 함수에 대한 사용자 정의 함수(UDF)를 만들 수 있으며 이러한 UDF를 호출하여 DataFrame의 데이터를 처리할 수 있습니다.

Snowpark API를 사용하여 UDF를 만들면 Snowpark 라이브러리는 함수에 대한 코드를 내부 스테이지에 업로드합니다. UDF를 호출하면 Snowpark 라이브러리는 데이터가 있는 서버에서 함수를 실행합니다. 결과적으로, 함수가 데이터를 처리하기 위해 데이터를 클라이언트로 전송할 필요가 없습니다.

사용자 지정 코드에서 Python 파일 또는 서드 파티 패키지에서 모듈을 가져올 수도 있습니다.

다음 두 가지 방법 중 하나로 사용자 지정 코드에 대한 UDF를 만들 수 있습니다.

  • 익명 UDF를 만들고 함수를 변수에 할당할 수 있습니다. 이 변수가 범위 내에 있는 한 이 변수를 사용하여 UDF를 호출할 수 있습니다.

  • 명명된 UDF를 만들고 이름으로 UDF를 호출할 수 있습니다. 예를 들어, 이름으로 UDF를 호출해야 하거나 후속 세션에서 UDF를 사용해야 하는 경우 이를 사용할 수 있습니다.

다음 섹션에서는 이러한 UDF를 만드는 방법에 대해 설명합니다.

CREATE FUNCTION 명령을 실행하여 UDF를 정의한 경우, Snowpark에서 해당 UDF를 호출할 수 있습니다. 자세한 내용은 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.

참고

Python UDF 배치 API가 있으며, 이를 사용하면 입력 행 배치를 Pandas DataFrames로 수신하는 Python 함수를 정의할 수 있습니다. 배치 인터페이스를 사용하면 머신 러닝 추론 시나리오에서 훨씬 더 나은 성능을 얻을 수 있습니다. 자세한 내용은 Python UDF 배치 API를 통해 벡터화된 UDF 사용하기 섹션을 참조하십시오.

UDF에 대한 종속성 지정하기

Snowpark API를 통해 UDF를 정의하려면 UDF가 의존하는 모듈(예: Python 파일, zip 파일, 리소스 파일 등)가 포함된 모든 파일에 대해 Session.add_import() 를 호출해야 합니다. 디렉터리를 지정할 수도 있으며 Snowpark 라이브러리는 디렉터리를 자동으로 압축해 zip 파일로 업로드합니다. (UDF에서 리소스를 읽는 것에 관한 자세한 내용은 Python 원본 파일에서 UDF 만들기 를 참조하십시오.)

Snowpark 라이브러리는 이러한 파일을 내부 스테이지에 업로드하고, UDF 실행 시 파일을 가져옵니다.

다음 예제에서는 스테이지에 zip 파일을 종속성으로 추가하는 방법을 보여줍니다.

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  

다음 예제에서는 로컬 컴퓨터에서 Python 파일을 추가하는 방법을 보여줍니다.

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  

다음 예제에서는 다른 유형의 종속 항목을 추가하는 방법을 보여줍니다.

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  

참고

Python Snowpark 라이브러리는 자동으로 업로드되지 않습니다.

다음 종속성을 지정할 필요가 없습니다.

  • Python 기본 제공 라이브러리.

    이러한 라이브러리는 UDF가 실행되는 서버의 런타임 환경에서 이미 사용 가능합니다.

UDF에서 Anaconda의 서드 파티 패키지 사용하기

Python UDF를 만들 때 설치할 Anaconda 패키지를 지정할 수 있습니다. Python UDF를 호출하는 쿼리가 Snowflake 웨어하우스 내에서 실행되면 Anaconda 패키지가 원활하게 설치되고 사용자를 대신하여 가상 웨어하우스에 캐시됩니다. 모범 사례, 사용 가능한 패키지를 보는 방법, 로컬 개발 환경을 설정하는 방법에 대한 자세한 내용은 서드 파티 패키지 사용하기 섹션을 참조하십시오.

session.add_packages 를 사용하여 세션 수준에서 패키지를 추가합니다.

이 코드 예제에서는 패키지를 가져오고 패키지의 버전을 반환하는 방법을 보여줍니다.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]

또한 session.add_requirements 를 사용하여 요구 사항 파일 이 있는 패키지를 지정할 수도 있습니다.

>>> session.add_requirements("mydir/requirements.txt")  

UDF 수준 패키지를 추가하여 이전에 추가했을 수 있는 세션 수준 패키지를 덮어쓸 수 있습니다.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]

중요

패키지 버전을 지정하지 않으면 Snowflake는 종속성을 확인할 때 최신 버전을 사용합니다. 하지만 UDF를 프로덕션 환경에 배포할 때 코드에서 항상 동일한 종속성 버전을 사용하도록 보장할 수 있습니다. 영구 및 임시 UDF 모두에 대해 그렇게 할 수 있습니다.

  • 영구 UDF를 만들 때 UDF는 한 번만 생성되고 등록됩니다. 이를 통해 종속성이 한 번 확인되고 선택한 버전이 프로덕션 워크로드에 사용됩니다. UDF가 실행될 때 항상 동일한 종속성 버전을 사용합니다.

  • 임시 UDF를 만들 때 종속성 버전을 버전 사양의 일부로 지정합니다. 그와 같이, UDF가 등록될 때 패키지 확인에서 지정된 버전을 사용합니다. 버전을 지정하지 않으면 새 버전이 제공될 때 종속성이 업데이트될 수 있습니다.

익명 UDF 만들기

익명 UDF를 만들려면 다음 중 하나를 수행할 수 있습니다.

  • snowflake.snowpark.functions 모듈에서 udf 함수를 호출하여 익명 함수의 정의를 전달합니다.

  • UDFRegistration 클래스에서 register 메서드를 호출하여 익명 함수의 정의를 전달합니다.

다음은 익명의 UDF의 예입니다.

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])

참고

여러 세션에서 실행할 수 있는 코드를 작성할 때 udf 함수를 사용하기보다는 register 메서드를 사용하여 UDF를 등록합니다. 이렇게 하면 기본 Snowflake Session 오브젝트를 찾을 수 없는 오류를 방지할 수 있습니다.

명명된 UDF 만들기 및 등록

이름으로 UDF를 호출하려는 경우(예: functions 모듈에서 call_udf 함수를 사용하여 호출), 명명된 UDF를 만들고 등록할 수 있습니다. 이렇게 하려면 다음 중 하나를 사용하십시오.

  • UDFRegistration 클래스의 register 메서드(name 인자 포함).

  • snowflake.snowpark.functions 모듈의 udf 함수(name 인자 포함).

UDFRegistration 클래스의 특성 또는 메서드에 액세스하려면 Session 클래스의 udf 속성을 호출하십시오.

register 또는 udf 를 호출하면 현재 세션에서 사용할 수 있는 임시 UDF가 생성됩니다.

영구 UDF를 만들려면 register 메서드 또는 udf 함수를 호출하고 is_permanent 인자를 True 로 설정하십시오. 영구 UDF를 만들 때 stage_location 인자도 UDF를 위한 Python 파일과 그 종속 항목이 업로드되는 스테이지 위치로 설정해야 합니다.

다음은 명명된 임시 UDF를 등록하는 방법의 예입니다.

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)

다음은 is_permanent 인자를 True 로 설정하여 명명된 영구 UDF를 등록하는 방법의 예입니다.

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1

다음은 이러한 UDF가 호출되는 예입니다.

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]

Python 원본 파일에서 UDF 만들기

Python 파일에서 UDF 처리기를 정의한 다음 UDFRegistration 클래스에 register_from_file 메서드를 사용하여 UDF를 만들 수도 있습니다.

다음은 register_from_file 을 사용하는 예입니다.

다음을 포함하는 Python 파일 test_udf_file.py 가 있다고 가정합니다.

def mod5(x: int) -> int:
    return x % 5

그런 다음 파일 test_udf_file.py 의 이 함수에서 UDF를 만들 수 있습니다.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

파일을 스테이지 위치에 업로드한 다음 이를 사용하여 UDF를 만들 수도 있습니다.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

UDF에서 파일 읽기

앞서 언급했듯이 Snowpark 라이브러리는 서버에서 UDF를 업로드하고 실행합니다. UDF가 파일에서 데이터를 읽어야 하는 경우, 파일이 UDF와 함께 업로드되었는지 확인해야 합니다.

파일을 읽도록 UDF를 설정하려면 다음을 수행하십시오.

  1. 파일이 종속성임을 지정합니다. 이는 서버에 파일을 업로드합니다. 자세한 내용은 UDF에 대한 종속성 지정하기 섹션을 참조하십시오.

    예:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
  2. UDF에서 파일을 읽습니다. 다음 예제에서, 이 파일은 UDF 생성 중에 한 번만 읽히고 UDF 실행 중에는 다시 읽히지 않습니다. 이는 서드 파티 라이브러리 cachetools 로 달성됩니다.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    

Python UDF 배치 API를 통해 벡터화된 UDF 사용하기

Python UDF 배치 API를 사용하면 입력 행 배치를 Pandas DataFrames 로 수신하고 결과 배치를 Pandas 배열 또는 Series 로 반환하는 Python 함수를 정의할 수 있습니다. Snowpark dataframe 의 열은 UDF 내부의 Pandas Series로 벡터화됩니다.

다음은 배치 인터페이스를 사용하는 방법의 예입니다.

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)

다른 Python UDF를 호출하는 것과 같은 방식으로 배치 API를 사용하는 벡터화된 Python UDF를 호출합니다. 자세한 내용은 SQL 문을 사용하여 벡터화된 UDF를 만드는 방법을 설명하는 Python UDF Batch API 섹션을 참조하십시오. 예를 들어 SQL 문에서 Python 코드를 지정할 때 vectorized 데코레이터를 사용할 수 있습니다. 이 문서에 설명된 Snowpark Python API를 사용하면 벡터화된 UDF를 만들려고 SQL 문을 사용하지 않아도 됩니다. 따라서 vectorized 데코레이터를 사용하지 않습니다.

배치당 행 수를 제한할 수 있습니다. 자세한 내용은 대상 배치 크기 설정하기 를 참조하십시오.

Snowpark Python API를 사용하여 벡터화된 UDF를 만드는 방법에 대한 자세한 설명과 예제는 Snowpark API Reference의 UDF 섹션 을 참조하십시오.