Python으로 DataFrames용 저장 프로시저 만들기¶
Snowpark API는 Python으로 저장 프로시저를 만드는 데 사용할 수 있는 메서드를 제공합니다. 이 항목에서는 저장 프로시저를 만드는 방법을 설명합니다.
이 항목의 내용:
소개¶
Snowpark를 사용하면 사용자 지정 람다 및 함수에 대한 저장 프로시저를 만들 수 있으며 이러한 저장 프로시저를 호출하여 DataFrame의 데이터를 처리할 수 있습니다.
현재 세션 내에만 존재하는 저장 프로시저(임시 저장 프로시저)뿐 아니라 다른 세션에서 사용할 수 있는 저장 프로시저(영구 저장 프로시저)도 만들 수 있습니다.
저장 프로시저에서 Anaconda의 서드 파티 패키지 사용하기¶
Python 저장 프로시저를 만들 때 설치할 Anaconda 패키지를 지정할 수 있습니다. Snowflake 웨어하우스 내에서 Python 저장 프로시저를 호출하면 Anaconda 패키지가 원활하게 설치되고 사용자를 대신하여 가상 웨어하우스에 캐시됩니다. 모범 사례, 사용 가능한 패키지를 보는 방법, 로컬 개발 환경을 설정하는 방법에 대한 자세한 내용은 서드 파티 패키지 사용하기 섹션을 참조하십시오.
session.add_packages
를 사용하여 세션 수준에서 패키지를 추가합니다.
이 코드 예제에서는 패키지를 가져오고 패키지의 버전을 반환하는 방법을 보여줍니다.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
또한 session.add_requirements
를 사용하여 요구 사항 파일 이 있는 패키지를 지정할 수도 있습니다.
>>> session.add_requirements("mydir/requirements.txt")
저장 프로시저 수준 패키지를 추가하여 이전에 추가했을 수 있는 세션 수준 패키지를 덮어쓸 수 있습니다.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
중요
패키지 버전을 지정하지 않으면 Snowflake는 종속성을 확인할 때 최신 버전을 사용합니다. 하지만 저장 프로시저를 프로덕션 환경에 배포할 때 코드에서 항상 동일한 종속성 버전을 사용하도록 보장할 수 있습니다. 영구 저장 프로시저와 임시 저장 프로시저 모두에 대해 그렇게 할 수 있습니다.
영구 저장 프로시저를 만들 때 저장 프로시저는 한 번만 생성되고 등록됩니다. 이를 통해 종속성이 한 번 확인되고 선택한 버전이 프로덕션 워크로드에 사용됩니다. 저장 프로시저가 실행될 때 항상 동일한 종속성 버전을 사용합니다.
임시 저장 프로시저를 만들 때 종속성 버전을 버전 사양의 일부로 지정합니다. 그와 같이, 저장 프로시저가 등록될 때 패키지 확인에서 지정된 버전을 사용합니다. 버전을 지정하지 않으면 새 버전이 제공될 때 종속성이 업데이트될 수 있습니다.
익명 저장 프로시저 만들기¶
익명 저장 프로시저를 만들려면 다음 중 하나를 수행할 수 있습니다.
snowflake.snowpark.functions
모듈에서sproc
함수를 호출하여 익명 함수의 정의를 전달합니다.StoredProcedureRegistration
클래스에서register
메서드를 호출하여 익명 함수의 정의를 전달합니다.StoredProcedureRegistration
클래스의 특성 또는 메서드에 액세스하려면Session
클래스의sproc
속성을 호출하십시오.
다음은 익명 저장 프로시저의 예입니다.
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
참고
여러 세션에서 실행할 수 있는 코드를 작성할 때 sproc
함수를 사용하기보다는 register
메서드를 사용하여 저장 프로시저를 등록합니다. 이렇게 하면 기본 Snowflake Session
오브젝트를 찾을 수 없는 오류를 방지할 수 있습니다.
명명된 저장 프로시저 만들기 및 등록하기¶
이름으로 저장 프로시저를 호출하려는 경우(예: Session
오브젝트에서 call
함수를 사용하여 호출), 명명된 저장 프로시저를 만들고 등록할 수 있습니다. 다음 중 한 가지 방법으로 이 작업을 수행할 수 있습니다.
snowflake.snowpark.functions
모듈에서sproc
함수를 호출하여name
인자와 익명 함수의 정의를 전달합니다.StoredProcedureRegistration
클래스에서register
메서드를 호출하여name
인자와 익명 함수의 정의를 전달합니다.StoredProcedureRegistration
클래스의 특성 또는 메서드에 액세스하려면Session
클래스의sproc
속성을 호출하십시오.
register
또는 sproc
를 호출하면 현재 세션에서 사용할 수 있는 임시 저장 프로시저가 생성됩니다.
영구 저장 프로시저를 만들려면 register
메서드 또는 sproc
함수를 호출하고 is_permanent
인자를 True
로 설정하십시오. 영구 저장 프로시저를 만들 때 stage_location
인자도 Snowpark에서 사용하는 Python 커넥터가 저장 프로시저와 그 종속 항목에 대한 Python 파일을 업로드하는 스테이지 위치로 설정해야 합니다.
다음은 명명된 임시 저장 프로시저를 등록하는 방법의 예입니다.
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
다음은 is_permanent
인자를 True
로 설정하여 명명된 영구 저장 프로시저를 등록하는 방법의 예입니다.
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
다음은 이러한 저장 프로시저가 호출되는 예입니다.
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
저장 프로시저가 있는 파일 읽기¶
저장 프로시저가 있는 파일의 내용은 다음과 같은 방법으로 읽을 수 있습니다.
파일을 가져온 다음 저장 프로시저의 홈 디렉터리에서 읽어서 정적으로 지정된 파일을 읽습니다.
SnowflakeFile로 동적으로 지정된 파일을 읽습니다. 계산 중에 파일에 액세스해야 하는 경우 이렇게 할 수 있습니다.
정적으로 지정된 파일 읽기¶
파일이 종속성임을 지정합니다. 이는 서버에 파일을 업로드합니다. 이 작업은 UDF의 경우와 같은 방법으로 수행됩니다. 자세한 내용은 UDF에 대한 종속성 지정하기 섹션을 참조하십시오.
예:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
저장 프로시저에서 파일을 읽습니다.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
SnowflakeFile
로 동적으로 지정된 파일 읽기¶
Snowpark snowflake.snowpark.files
모듈에서 SnowflakeFile
클래스를 사용하여 스테이지에서 파일을 읽을 수 있습니다. SnowflakeFile
클래스는 모든 크기의 파일을 스트리밍할 수 있는 동적 파일 액세스를 제공합니다. 동적 파일 액세스는 여러 파일에서 반복하려는 경우에도 유용합니다. 예를 들어 여러 파일 처리하기 섹션을 참조하십시오.
SnowflakeFile
을 사용하여 파일을 읽는 방법에 대한 자세한 내용과 예는 Python UDF 처리기에서 SnowflakeFile 클래스를 사용하여 파일 읽기 를 참조하십시오.
다음 예에서는 SnowflakeFile
을 사용하여 스테이지에서 파일을 읽고 파일 길이를 반환하는 영구 저장 프로시저를 만듭니다.
저장 프로시저를 만듭니다.
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
저장 프로시저를 호출합니다.
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")