자습서: Python Snowpark 테스트¶
소개¶
이 자습서에서는 Snowpark Python 코드 테스트의 기본 사항을 소개합니다.
알아볼 내용¶
이 자습서에서는 다음에 대해 설명합니다.
Snowflake에 연결되어 있는 동안 Snowpark 코드를 테스트합니다.
PyTest와 같은 표준 테스트 유틸리티를 사용하여 Snowpark Python UDF, DataFrame 변환, 저장 프로시저를 테스트할 수 있습니다.
로컬 테스트 프레임워크를 사용하여 Snowflake 계정에 연결하지 않고 로컬에서 Snowpark Python DataFrames를 테스트합니다.
코드 변경 사항을 배포하기 전에 로컬 테스트 프레임워크를 사용하여 개발 컴퓨터에서 로컬 테스트를 수행할 수 있습니다.
전제 조건¶
로컬 테스트 프레임워크를 사용하려면 다음을 수행하십시오.
Snowpark Python 라이브러리 버전 1.11.1 이상을 사용해야 합니다.
지원되는 Python 버전은 다음과 같습니다.
3.8
3.9
3.10
3.11
프로젝트 설정하기¶
이 섹션에서는 프로젝트 리포지토리를 복제하고 자습서에 필요한 환경을 설정합니다.
프로젝트 리포지토리를 복제합니다.
git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
Git이 설치되어 있지 않은 경우 리포지토리 페이지로 이동하여 Code » Download Contents 를 클릭하여 콘텐츠를 다운로드하십시오.
계정 자격 증명으로 환경 변수를 설정합니다. Snowpark API는 이런 환경 변수를 사용하여 Snowflake 계정을 인증합니다.
# Linux/MacOS export SNOWSQL_ACCOUNT=<replace with your account identifer> export SNOWSQL_USER=<replace with your username> export SNOWSQL_ROLE=<replace with your role> export SNOWSQL_PWD=<replace with your password> export SNOWSQL_DATABASE=<replace with your database> export SNOWSQL_SCHEMA=<replace with your schema> export SNOWSQL_WAREHOUSE=<replace with your warehouse>
# Windows/PowerShell $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>" $env:SNOWSQL_USER = "<replace with your username>" $env:SNOWSQL_ROLE = "<replace with your role>" $env:SNOWSQL_PWD = "<replace with your password>" $env:SNOWSQL_DATABASE = "<replace with your database>" $env:SNOWSQL_SCHEMA = "<replace with your schema>" $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
선택 사항: (Linux/MacOS에서) bash 프로필을 편집하거나 (Windows에서) System Properties 메뉴를 사용하여 이 환경 변수를 영구적으로 설정할 수 있습니다.
Anaconda를 사용하여 conda 환경을 만들고 활성화합니다.
conda env create --file environment.yml conda activate snowpark-testing
setup/create_table.py
를 실행하여 계정에 샘플 테이블을 만듭니다. 이 Python 스크립트는 CITIBIKE라는 데이터베이스, PUBLIC이라는 스키마, TRIPS라는 작은 테이블을 생성합니다.python setup/create_table.py
이제 다음 섹션으로 이동할 준비가 되었습니다. 이 섹션에서는 다음을 수행했습니다.
자습서 리포지토리를 복제했습니다.
계정 정보로 환경 변수를 생성했습니다.
프로젝트에 대한 conda 환경을 생성했습니다.
Snowpark API를 사용하여 Snowflake에 연결하고 샘플 데이터베이스, 스키마, 테이블을 생성했습니다.
저장 프로시저 사용해보기¶
샘플 프로젝트에는 저장 프로시저 처리기(sproc.py
)와 3개의 DataFrames 변환기 메서드(transformers.py
)가 포함됩니다. 저장 프로시저 처리기는 UDF 및 DataFrame 변환기를 사용하여 원본 테이블 CITIBIKE.PUBLIC.TRIPS
에서 읽고 두 개의 팩트 테이블 MONTH_FACTS
및 BIKE_FACTS
를 생성합니다.
이 명령을 실행하여 명령줄에서 저장 프로시저를 실행할 수 있습니다.
python project/sproc.py
이제 프로젝트에 익숙해졌으므로 다음 섹션에서는 테스트 디렉터리를 설정하고 Snowflake 세션을 위한 PyTest 픽스쳐를 생성해 보겠습니다.
Snowflake 세션을 위한 PyTest 픽스쳐 만들기¶
PyTest 픽스쳐 는 일반적으로 테스트에 데이터나 연결을 제공하기 위해 테스트(또는 테스트 모듈) 전에 실행되는 함수입니다. 이 프로젝트의 경우 Snowpark Session
오브젝트를 반환하는 PyTest 픽스쳐를 생성합니다. 테스트 사례에서는 이 세션을 사용하여 Snowflake에 연결합니다.
프로젝트 루트 디렉터리 아래에
test
디렉터리를 만듭니다.mkdir test
test
디렉터리 아래에conftest.py
라는 새 Python 파일을 만듭니다.conftest.py
내에서Session
오브젝트의 PyTest 픽스쳐를 만듭니다.import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session @pytest.fixture def session() -> Session: return Session.builder.configs(get_env_var_config()).create()
DataFrame 변환기의 단위 테스트 추가하기¶
test
디렉터리에서test_transformers.py
라는 새 Python 파일을 만듭니다.test_transformers.py
파일에서 변환기 메서드를 가져옵니다.# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
다음으로, 이러한 변환기의 단위 테스트를 만듭니다. 일반적인 규칙은
test_<메서드 이름>
이라는 이름으로 각 테스트에 대한 메서드를 만드는 것입니다. 이 사례의 경우 테스트는 다음과 같습니다.# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts def test_add_rider_age(session): ... def test_calc_bike_facts(session): ... def test_calc_month_facts(session): ...
각 테스트 사례의
session
매개 변수는 이전 섹션에서 생성한 PyTest 픽스쳐를 참조합니다.이제 각 변환기의 테스트 사례를 구현합니다. 다음 패턴을 사용합니다.
입력 DataFrame을 생성합니다.
예상 출력 DataFrame을 생성합니다.
1단계의 입력 DataFrame을 변환기 메서드에 전달합니다.
3단계의 출력을 2단계의 예상 출력과 비교합니다.
# test/test_transformers.py from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType def test_add_rider_age(session: Session): input = session.create_dataframe( [ [1980], [1995], [2000] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType())]) ) expected = session.create_dataframe( [ [1980, 43], [1995, 28], [2000, 23] ], schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())]) ) actual = add_rider_age(input) assert expected.collect() == actual.collect() def test_calc_bike_facts(session: Session): input = session.create_dataframe([ [1, 10, 20], [1, 5, 30], [2, 20, 50], [2, 10, 60] ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("TRIPDURATION", IntegerType()), StructField("RIDER_AGE", IntegerType()) ]) ) expected = session.create_dataframe([ [1, 2, 7.5, 25.0], [2, 2, 15.0, 55.0], ], schema=StructType([ StructField("BIKEID", IntegerType()), StructField("COUNT", IntegerType()), StructField("AVG_TRIPDURATION", FloatType()), StructField("AVG_RIDER_AGE", FloatType()) ]) ) actual = calc_bike_facts(input) assert expected.collect() == actual.collect() def test_calc_month_facts(session: Session): from patches import patch_to_timestamp input = session.create_dataframe( data=[ ['2018-03-01 09:47:00.000 +0000', 1, 10, 15], ['2018-03-01 09:47:14.000 +0000', 2, 20, 12], ['2018-04-01 09:47:04.000 +0000', 3, 6, 30] ], schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE'] ) expected = session.create_dataframe( data=[ ['Mar', 2, 15, 13.5], ['Apr', 1, 6, 30.0] ], schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE'] ) actual = calc_month_facts(input) assert expected.collect() == actual.collect()
이제 PyTest를 실행하여 모든 단위 테스트를 실행할 수 있습니다.
pytest test/test_transformers.py
저장 프로시저의 통합 테스트 추가하기¶
이제 DataFrame 변환기 메서드의 단위 테스트가 있으므로 저장 프로시저의 통합 테스트를 추가해 보겠습니다. 이 테스트 사례에서는 다음 패턴을 따릅니다.
저장 프로시저에 대한 입력 데이터를 나타내는 테이블을 만듭니다.
저장 프로시저의 두 출력 테이블에 있어야 하는 내용으로 DataFrames를 2개 만듭니다.
저장 프로시저를 호출합니다.
실제 출력 테이블을 2단계의 DataFrames와 비교합니다.
정리: 1단계의 입력 테이블과 3단계의 출력 테이블을 삭제합니다.
test
디렉터리에 test_sproc.py
라는 Python 파일을 만듭니다.
프로젝트 디렉터리에서 저장 프로시저 처리기를 가져오고 테스트 사례를 만듭니다.
# test/test_sproc.py
from project.sproc import create_fact_tables
def test_create_fact_tables(session):
...
입력 테이블 생성부터 시작하여 테스트 사례를 구현합니다.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
다음으로, 예상 출력 테이블에 대해 DataFrames를 생성합니다.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
마지막으로, 저장 프로시저를 호출하고 출력 테이블을 읽습니다. 실제 테이블을 DataFrame 콘텐츠와 비교합니다.
# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *
def test_create_fact_tables(session):
DB = 'CITIBIKE'
SCHEMA = 'TEST'
# Set up source table
tbl = session.create_dataframe(
data=[
[1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
[1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
[1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
[1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
[1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
[1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
[1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
[1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
[1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
[1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
[1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
],
schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION', 'BIKEID'],
)
tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
# Expected values
n_rows_expected = 12
bike_facts_expected = session.create_dataframe(
data=[
[30958, 1, 551.0, 40.0],
[19278, 1, 242.0, 35.0],
[18461, 1, 768.0, 31.0],
[15533, 1, 690.0, 43.0],
[32449, 1, 490.0, 32.0],
[29411, 1, 457.0, 64.0],
[28015, 1, 279.0, 52.0],
[15148, 1, 546.0, 59.0],
[16967, 1, 358.0, 40.0],
[20644, 1, 848.0, 38.0],
[16365, 1, 295.0, 39.0]
],
schema=StructType([
StructField("BIKEID", IntegerType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", FloatType()),
StructField("AVG_RIDER_AGE", FloatType())
])
).collect()
month_facts_expected = session.create_dataframe(
data=[['Mar', 11, 502.18182, 43.00000]],
schema=StructType([
StructField("MONTH", StringType()),
StructField("COUNT", IntegerType()),
StructField("AVG_TRIPDURATION", DecimalType()),
StructField("AVG_RIDER_AGE", DecimalType())
])
).collect()
# Call sproc, get actual values
n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()
# Comparisons
assert n_rows_expected == n_rows_actual
assert bike_facts_expected == bike_facts_actual
assert month_facts_expected == month_facts_actual
테스트 사례를 실행하려면 터미널에서 pytest
를 실행하십시오.
pytest test/test_sproc.py
프로젝트의 모든 테스트를 실행하려면 다른 옵션 없이 pytest
를 실행하십시오.
pytest
로컬 테스트 구성하기¶
이제 DataFrame 변환기 및 저장 프로시저에 대한 PyTest 테스트 모음이 생겼습니다. 각 테스트 사례에서 Session
픽스쳐는 Snowflake 계정에 연결하고 Snowpark Python API에서 SQL을 전송하고 응답을 검색하는 데 사용됩니다.
또는 로컬 테스트 프레임워크를 사용하여 Snowflake에 연결하지 않고 로컬에서 변환을 실행할 수 있습니다. 대규모 테스트 모음에서 이를 통해 테스트 실행 속도가 훨씬 더 빨라질 수 있습니다. 이 섹션에서는 로컬 테스트 프레임워크 기능을 사용하도록 테스트 모음을 업데이트하는 방법을 보여줍니다.
먼저 PyTest
Session
픽스쳐를 업데이트합니다. 로컬 테스트 모드와 라이브 테스트 모드 간을 전환하기 위해 명령줄 옵션을 PyTest에 추가하겠습니다.# test/conftest.py import pytest from project.utils import get_env_var_config from snowflake.snowpark.session import Session def pytest_addoption(parser): parser.addoption("--snowflake-session", action="store", default="live") @pytest.fixture(scope='module') def session(request) -> Session: if request.config.getoption('--snowflake-session') == 'local': return Session.builder.configs({'local_testing': True}).create() else: return Session.builder.configs(get_env_var_config()).create()
모든 기본 제공 함수가 로컬 테스트 프레임워크에서 지원되는 것은 아니므로 먼저 이 메서드를 패치해야 합니다(예:
calc_month_facts()
변환기에 사용되는monthname()
함수). 테스트 디렉터리 아래에patches.py
라는 파일을 만듭니다. 이 파일에 다음 코드를 붙여넣습니다.from snowflake.snowpark.mock.functions import patch from snowflake.snowpark.functions import monthname from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType from snowflake.snowpark.types import StringType import datetime import calendar @patch(monthname) def patch_monthname(column: ColumnEmulator) -> ColumnEmulator: ret_column = ColumnEmulator(data=[ calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month] for row in column]) ret_column.sf_type = ColumnType(StringType(), True) return ret_column
위의 패치는 열 내에 데이터 행을 포함하는
pandas.Series
와 유사한 오브젝트인 단일 매개 변수column
을 허용합니다. 그런 다음 Python 모듈datetime
및calendar
의 메서드 조합을 사용하여 기본 제공되는monthname()
열의 기능을 에뮬레이트합니다. 마지막으로, 기본 제공 메서드가 월(“Jan”, “Feb”, “Mar” 등)에 해당하는 문자열을 반환하므로 반환 유형을String
으로 설정합니다.다음으로, 이 메서드를 DataFrame 변환기 및 저장 프로시저에 대한 테스트로 가져옵니다.
# test/test_transformers.py # No changes to the other unit test methods def test_calc_month_facts(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes
로컬 플래그로
pytest
를 다시 실행합니다.pytest test/test_transformers.py --snowflake-session local
이제 저장 프로시저 테스트에 동일한 패치를 적용합니다.
#test/test_sproc.py def test_create_fact_tables(request, session): # Add conditional to include the patch if local testing is being used if request.config.getoption('--snowflake-session') == 'local': from patches import patch_monthname # No other changes required
로컬 플래그로 pytest를 다시 실행합니다.
pytest test/test_sproc.py --snowflake-session local
마무리하기 위해 로컬에서 전체 테스트 모음의 실행 소요 시간과 라이브 연결 시의 소요 시간을 비교해 보겠습니다.
time
명령을 사용하여 두 명령에 걸린 시간을 측정하겠습니다. 라이브 연결부터 시작해 보겠습니다.time pytest
이 경우 테스트 도구를 실행하는 데 7.89초가 걸렸습니다. (정확한 시간은 컴퓨터, 네트워크 연결 및 기타 요인에 따라 다를 수 있습니다.)
=================================== test session starts ========================== platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 6.86s ================================= pytest 1.63s user 1.86s system 44% cpu 7.893 total
이제 로컬 테스트 프레임워크로 실행해 보겠습니다.
time pytest --snowflake-session local
로컬 테스트 프레임워크인 테스트 모음을 사용하면 실행 시간이 1초밖에 걸리지 않았습니다!
================================== test session starts ================================ platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0 rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial configfile: pytest.ini collected 4 items test/test_sproc.py . [ 25%] test/test_transformers.py ... [100%] =================================== 4 passed in 0.10s ================================== pytest --snowflake-session local 1.37s user 1.70s system 281% cpu 1.093 total
자세히 알아보기¶
끝났습니다! 잘하셨습니다.
이 자습서에서는 Python Snowpark 코드를 테스트하는 방법을 전체적으로 살펴보았습니다. 그 과정에서 다음 작업을 수행했습니다.
PyTest 픽스쳐를 생성하고 단위 테스트와 통합 테스트를 추가했습니다.
자세한 내용은 Snowpark Python용 테스트 작성하기 섹션을 참조하십시오.
구성된 로컬 테스트
자세한 내용은 로컬 테스트 프레임워크 섹션을 참조하십시오.