Python에서 DataFrame용 사용자 정의 함수(UDF) 만들기

Snowpark API는 람다 또는 Python의 함수에서 사용자 정의 함수를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 이러한 형식의 함수를 만드는 방법에 대해 설명합니다.

이 항목의 내용:

소개

Snowpark를 사용하면 사용자 지정 람다 및 함수에 대한 사용자 정의 함수(UDF)를 만들 수 있으며 이러한 UDF를 호출하여 DataFrame의 데이터를 처리할 수 있습니다.

Snowpark API를 사용하여 UDF를 만들면 Snowpark 라이브러리는 함수에 대한 코드를 내부 스테이지에 업로드합니다. UDF를 호출하면 Snowpark 라이브러리는 데이터가 있는 서버에서 함수를 실행합니다. 결과적으로, 함수가 데이터를 처리하기 위해 데이터를 클라이언트로 전송할 필요가 없습니다.

사용자 지정 코드에서 Python 파일 또는 서드 파티 패키지에서 모듈을 가져올 수도 있습니다.

다음 두 가지 방법 중 하나로 사용자 지정 코드에 대한 UDF를 만들 수 있습니다.

  • 익명 UDF를 만들고 함수를 변수에 할당할 수 있습니다. 이 변수가 범위 내에 있는 한 이 변수를 사용하여 UDF를 호출할 수 있습니다.

  • 명명된 UDF를 만들고 이름으로 UDF를 호출할 수 있습니다. 예를 들어, 이름으로 UDF를 호출해야 하거나 후속 세션에서 UDF를 사용해야 하는 경우 이를 사용할 수 있습니다.

다음 섹션에서는 로컬 개발 환경 또는 Python 워크시트 를 사용하여 이러한 UDF를 생성하는 방법을 설명합니다.

CREATE FUNCTION 명령을 실행하여 UDF를 정의한 경우, Snowpark에서 해당 UDF를 호출할 수 있습니다. 자세한 내용은 사용자 정의 함수(UDF) 호출하기 섹션을 참조하십시오.

참고

벡터화된 Python UDF를 사용하면 Pandas DataFrames로 입력 행의 배치를 수신하는 Python 함수를 정의할 수 있습니다. 따라서 머신 러닝 유추 시나리오에서 훨씬 더 나은 성능을 얻을 수 있습니다. 자세한 내용은 벡터화된 UDF 사용하기 섹션을 참조하십시오.

참고

Python 워크시트로 작업하는 경우 처리기 함수 내에서 다음 예제를 사용하십시오.

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

이러한 예제에서 Row 오브젝트의 list 와 같이 DataFrame 이외의 것을 반환하는 경우 반환 유형을 변경 하여 예제의 반환 유형과 일치시키십시오.

코드 예제를 실행한 후 Results 탭을 사용하여 반환된 출력을 확인합니다. 자세한 내용은 Python 워크시트 실행하기 섹션을 참조하십시오.

UDF에 대한 종속성 지정하기

Snowpark API를 사용하여 UDF를 정의하려면 Python 파일, zip 파일, 리소스 파일 등과 같이 UDF가 의존하는 모든 모듈을 포함하는 파일을 가져와야 합니다.

디렉터리를 지정할 수도 있으며 Snowpark 라이브러리는 디렉터리를 자동으로 압축해 zip 파일로 업로드합니다. (UDF에서 리소스를 읽는 것에 관한 자세한 내용은 UDF가 있는 파일 읽기 를 참조하십시오.)

Session.add_import() 를 호출할 경우 Snowpark 라이브러리는 지정된 파일을 내부 스테이지에 업로드하고 UDF를 실행할 때 이들 파일을 가져옵니다.

다음 예제에서는 스테이지에서 zip 파일을 코드에 종속성으로 추가하는 방법을 보여줍니다.

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

다음 예제에서는 로컬 컴퓨터에서 Python 파일을 추가하는 방법을 보여줍니다.

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

다음 예제에서는 다른 유형의 종속 항목을 추가하는 방법을 보여줍니다.

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

참고

Python Snowpark 라이브러리는 자동으로 업로드되지 않습니다.

다음 종속성을 지정할 필요가 없습니다.

  • Python 기본 제공 라이브러리.

    이러한 라이브러리는 UDF가 실행되는 서버의 런타임 환경에서 이미 사용 가능합니다.

UDF에서 Anaconda의 서드 파티 패키지 사용하기

UDF에서 Snowflake Anaconda 채널의 서드 파티 패키지를 사용할 수 있습니다.

  • Python 워크시트에서 Python UDF를 생성하는 경우 이미 워크시트에서 Anaconda 패키지를 사용할 수 있습니다. 스테이지의 Python 파일을 워크시트에 추가하기 섹션을 참조하십시오.

  • 로컬 개발 환경에서 Python UDF를 생성하는 경우 설치할 Anaconda 패키지를 지정할 수 있습니다.

Python UDF를 호출하는 쿼리가 Snowflake 웨어하우스 내에서 실행되면 Anaconda 패키지가 원활하게 설치되고 사용자를 대신하여 가상 웨어하우스에 캐시됩니다.

모범 사례, 사용 가능한 패키지를 보는 방법, 로컬 개발 환경을 설정하는 방법에 대한 자세한 내용은 서드 파티 패키지 사용하기 섹션을 참조하십시오.

로컬 개발 환경에서 Python UDF를 작성하는 경우 session.add_packages 를 사용하여 세션 수준에서 패키지를 추가합니다.

이 코드 예제에서는 패키지를 가져오고 패키지의 버전을 반환하는 방법을 보여줍니다.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

또한 session.add_requirements 를 사용하여 요구 사항 파일 이 있는 패키지를 지정할 수도 있습니다.

>>> session.add_requirements("mydir/requirements.txt")  
Copy

UDF 수준 패키지를 추가하여 이전에 추가했을 수 있는 세션 수준 패키지를 덮어쓸 수 있습니다.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

중요

패키지 버전을 지정하지 않으면 Snowflake는 종속성을 확인할 때 최신 버전을 사용합니다. UDF를 프로덕션 환경에 배포할 때 코드에서 항상 동일한 종속성 버전을 사용하도록 보장할 수 있습니다. 영구 및 임시 UDF 모두에 대해 그렇게 할 수 있습니다.

  • 영구 UDF를 만들 때 UDF는 한 번만 생성되고 등록됩니다. 이를 통해 종속성이 한 번 확인되고 선택한 버전이 프로덕션 워크로드에 사용됩니다. UDF가 실행될 때 항상 동일한 종속성 버전을 사용합니다.

  • 임시 UDF를 만들 때 종속성 버전을 버전 사양의 일부로 지정합니다. 그와 같이, UDF가 등록될 때 패키지 확인에서 지정된 버전을 사용합니다. 버전을 지정하지 않으면 새 버전이 제공될 때 종속성이 업데이트될 수 있습니다.

익명 UDF 만들기

익명 UDF를 만들려면 다음 중 하나를 수행할 수 있습니다.

  • snowflake.snowpark.functions 모듈에서 udf 함수를 호출하여 익명 함수의 정의를 전달합니다.

  • UDFRegistration 클래스에서 register 메서드를 호출하여 익명 함수의 정의를 전달합니다.

다음은 익명의 UDF의 예입니다.

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

참고

여러 세션에서 실행할 수 있는 코드를 작성할 때 udf 함수를 사용하기보다는 register 메서드를 사용하여 UDF를 등록합니다. 이렇게 하면 기본 Snowflake Session 오브젝트를 찾을 수 없는 오류를 방지할 수 있습니다.

명명된 UDF 만들기 및 등록

이름으로 UDF를 호출하려는 경우(예: functions 모듈에서 call_udf 함수를 사용하여 호출), 명명된 UDF를 만들고 등록할 수 있습니다. 이렇게 하려면 다음 중 하나를 사용하십시오.

  • UDFRegistration 클래스의 register 메서드(name 인자 포함).

  • snowflake.snowpark.functions 모듈의 udf 함수(name 인자 포함).

UDFRegistration 클래스의 특성 또는 메서드에 액세스하려면 Session 클래스의 udf 속성을 호출하십시오.

register 또는 udf 를 호출하면 현재 세션에서 사용할 수 있는 임시 UDF가 생성됩니다.

영구 UDF를 만들려면 register 메서드 또는 udf 함수를 호출하고 is_permanent 인자를 True 로 설정하십시오. 영구 UDF를 만들 때 stage_location 인자도 UDF를 위한 Python 파일과 그 종속 항목이 업로드되는 스테이지 위치로 설정해야 합니다.

다음은 명명된 임시 UDF를 등록하는 방법의 예입니다.

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

다음은 is_permanent 인자를 True 로 설정하여 명명된 영구 UDF를 등록하는 방법의 예입니다.

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

다음은 이러한 UDF가 호출되는 예입니다.

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

SQL을 사용하여 UDF를 호출할 수도 있습니다.

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Python 원본 파일에서 UDF 만들기

로컬 개발 환경에서 UDF를 만들 때 Python 파일에서 UDF 처리기를 정의한 다음 UDFRegistration 클래스에 register_from_file 메서드를 사용하여 UDF를 만들 수 있습니다.

참고

Python 워크시트에서는 이 방법을 사용할 수 없습니다.

다음은 register_from_file 을 사용하는 예입니다.

다음을 포함하는 Python 파일 test_udf_file.py 가 있다고 가정합니다.

def mod5(x: int) -> int:
    return x % 5
Copy

그런 다음 파일 test_udf_file.py 의 이 함수에서 UDF를 만들 수 있습니다.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

파일을 스테이지 위치에 업로드한 다음 이를 사용하여 UDF를 만들 수도 있습니다.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

UDF가 있는 파일 읽기

Python 코드는 다음과 같은 방법으로 파일의 내용을 읽을 수 있습니다.

정적으로 지정된 파일 읽기

Snowpark 라이브러리는 서버에서 UDF를 업로드하고 실행합니다. UDF가 파일에서 데이터를 읽어야 하는 경우, 파일이 UDF와 함께 업로드되었는지 확인해야 합니다.

참고

Python 워크시트에 UDF를 작성하는 경우 UDF는 스테이지의 파일만 읽을 수 있습니다.

파일을 읽도록 UDF를 설정하려면 다음을 수행하십시오.

  1. 파일이 종속성임을 지정합니다. 이는 서버에 파일을 업로드합니다. 자세한 내용은 UDF에 대한 종속성 지정하기 섹션을 참조하십시오.

    예:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. UDF에서 파일을 읽습니다. 다음 예제에서, 이 파일은 UDF 생성 중에 한 번만 읽히고 UDF 실행 중에는 다시 읽히지 않습니다. 이는 서드 파티 라이브러리 cachetools 로 달성됩니다.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

SnowflakeFile 로 동적으로 지정된 파일 읽기

Snowpark snowflake.snowpark.files 모듈에서 SnowflakeFile 클래스를 사용하여 스테이지에서 파일을 읽을 수 있습니다. SnowflakeFile 클래스는 모든 크기의 파일을 스트리밍할 수 있는 동적 파일 액세스를 제공합니다. 동적 파일 액세스는 여러 파일에서 반복하려는 경우에도 유용합니다. 예를 들어 여러 파일 처리하기 섹션을 참조하십시오.

SnowflakeFile 을 사용하여 파일을 읽는 방법에 대한 자세한 내용과 예는 Python UDF 처리기에서 SnowflakeFile 클래스를 사용하여 파일 읽기 를 참조하십시오.

다음 예제에서는 SnowflakeFile 을 사용하여 스테이지에서 텍스트 파일을 읽고 파일 길이를 반환하는 임시 UDF를 등록합니다.

UDF를 등록합니다.

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

UDF를 호출합니다.

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

벡터화된 UDF 사용하기

벡터화된 Python UDF를 사용하면 입력 행 배치를 Pandas DataFrames 로 수신하고 결과 배치를 Pandas 배열 또는 Series 로 반환하는 Python 함수를 정의할 수 있습니다. Snowpark dataframe 의 열은 UDF 내부의 Pandas Series로 벡터화됩니다.

다음은 배치 인터페이스를 사용하는 방법의 예입니다.

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

다른 Python UDF를 호출하는 것과 같은 방식으로 벡터화된 Python UDF를 호출합니다. 자세한 내용은 SQL 문을 사용하여 벡터화된 UDF를 만드는 방법을 설명하는 벡터화된 Python UDF 섹션을 참조하십시오. 예를 들어 SQL 문에서 Python 코드를 지정할 때 vectorized 데코레이터를 사용할 수 있습니다. 이 문서에 설명된 Snowpark Python API를 사용하면 벡터화된 UDF를 만들려고 SQL 문을 사용하지 않아도 됩니다. 따라서 vectorized 데코레이터를 사용하지 않습니다.

배치당 행 수를 제한할 수 있습니다. 자세한 내용은 대상 배치 크기 설정하기 를 참조하십시오.

Snowpark Python API를 사용하여 벡터화된 UDF를 만드는 방법에 대한 자세한 설명과 예제는 Snowpark API Reference의 UDF 섹션 을 참조하십시오.