자습서: Python Snowpark 테스트

소개

이 자습서에서는 Snowpark Python 코드 테스트의 기본 사항을 소개합니다.

알아볼 내용

이 자습서에서는 다음에 대해 설명합니다.

  • Snowflake에 연결되어 있는 동안 Snowpark 코드를 테스트합니다.

    PyTest와 같은 표준 테스트 유틸리티를 사용하여 Snowpark Python UDF, DataFrame 변환, 저장 프로시저를 테스트할 수 있습니다.

  • 로컬 테스트 프레임워크를 사용하여 Snowflake 계정에 연결하지 않고 로컬에서 Snowpark Python DataFrames를 테스트합니다.

    코드 변경 사항을 배포하기 전에 로컬 테스트 프레임워크를 사용하여 개발 컴퓨터에서 로컬 테스트를 수행할 수 있습니다.

전제 조건

로컬 테스트 프레임워크를 사용하려면 다음을 수행하십시오.

  • Snowpark Python 라이브러리 버전 1.11.1 이상을 사용해야 합니다.

  • 지원되는 Python 버전은 다음과 같습니다.

    • 3.8

    • 3.9

    • 3.10

    • 3.11

프로젝트 설정하기

이 섹션에서는 프로젝트 리포지토리를 복제하고 자습서에 필요한 환경을 설정합니다.

  1. 프로젝트 리포지토리를 복제합니다.

    git clone https://github.com/Snowflake-Labs/sftutorial-snowpark-testing
    
    Copy

    Git이 설치되어 있지 않은 경우 리포지토리 페이지로 이동하여 Code » Download Contents 를 클릭하여 콘텐츠를 다운로드하십시오.

  2. 계정 자격 증명으로 환경 변수를 설정합니다. Snowpark API는 이런 환경 변수를 사용하여 Snowflake 계정을 인증합니다.

    # Linux/MacOS
    export SNOWSQL_ACCOUNT=<replace with your account identifer>
    export SNOWSQL_USER=<replace with your username>
    export SNOWSQL_ROLE=<replace with your role>
    export SNOWSQL_PWD=<replace with your password>
    export SNOWSQL_DATABASE=<replace with your database>
    export SNOWSQL_SCHEMA=<replace with your schema>
    export SNOWSQL_WAREHOUSE=<replace with your warehouse>
    
    Copy
    # Windows/PowerShell
    $env:SNOWSQL_ACCOUNT = "<replace with your account identifer>"
    $env:SNOWSQL_USER = "<replace with your username>"
    $env:SNOWSQL_ROLE = "<replace with your role>"
    $env:SNOWSQL_PWD = "<replace with your password>"
    $env:SNOWSQL_DATABASE = "<replace with your database>"
    $env:SNOWSQL_SCHEMA = "<replace with your schema>"
    $env:SNOWSQL_WAREHOUSE = "<replace with your warehouse>"
    
    Copy

    선택 사항: (Linux/MacOS에서) bash 프로필을 편집하거나 (Windows에서) System Properties 메뉴를 사용하여 이 환경 변수를 영구적으로 설정할 수 있습니다.

  3. Anaconda를 사용하여 conda 환경을 만들고 활성화합니다.

    conda env create --file environment.yml
    conda activate snowpark-testing
    
    Copy
  4. setup/create_table.py 를 실행하여 계정에 샘플 테이블을 만듭니다. 이 Python 스크립트는 CITIBIKE라는 데이터베이스, PUBLIC이라는 스키마, TRIPS라는 작은 테이블을 생성합니다.

    python setup/create_table.py
    
    Copy

이제 다음 섹션으로 이동할 준비가 되었습니다. 이 섹션에서는 다음을 수행했습니다.

  • 자습서 리포지토리를 복제했습니다.

  • 계정 정보로 환경 변수를 생성했습니다.

  • 프로젝트에 대한 conda 환경을 생성했습니다.

  • Snowpark API를 사용하여 Snowflake에 연결하고 샘플 데이터베이스, 스키마, 테이블을 생성했습니다.

저장 프로시저 사용해보기

샘플 프로젝트에는 저장 프로시저 처리기(sproc.py)와 3개의 DataFrames 변환기 메서드(transformers.py)가 포함됩니다. 저장 프로시저 처리기는 UDF 및 DataFrame 변환기를 사용하여 원본 테이블 CITIBIKE.PUBLIC.TRIPS 에서 읽고 두 개의 팩트 테이블 MONTH_FACTSBIKE_FACTS 를 생성합니다.

이 명령을 실행하여 명령줄에서 저장 프로시저를 실행할 수 있습니다.

python project/sproc.py
Copy

이제 프로젝트에 익숙해졌으므로 다음 섹션에서는 테스트 디렉터리를 설정하고 Snowflake 세션을 위한 PyTest 픽스쳐를 생성해 보겠습니다.

Snowflake 세션을 위한 PyTest 픽스쳐 만들기

PyTest 픽스쳐 는 일반적으로 테스트에 데이터나 연결을 제공하기 위해 테스트(또는 테스트 모듈) 전에 실행되는 함수입니다. 이 프로젝트의 경우 Snowpark Session 오브젝트를 반환하는 PyTest 픽스쳐를 생성합니다. 테스트 사례에서는 이 세션을 사용하여 Snowflake에 연결합니다.

  1. 프로젝트 루트 디렉터리 아래에 test 디렉터리를 만듭니다.

    mkdir test
    
    Copy
  2. test 디렉터리 아래에 conftest.py 라는 새 Python 파일을 만듭니다. conftest.py 내에서 Session 오브젝트의 PyTest 픽스쳐를 만듭니다.

    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    @pytest.fixture
    def session() -> Session:
        return Session.builder.configs(get_env_var_config()).create()
    
    Copy

DataFrame 변환기의 단위 테스트 추가하기

  1. test 디렉터리에서 test_transformers.py 라는 새 Python 파일을 만듭니다.

  2. test_transformers.py 파일에서 변환기 메서드를 가져옵니다.

    # test/test_transformers.py
    
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    
    Copy
  3. 다음으로, 이러한 변환기의 단위 테스트를 만듭니다. 일반적인 규칙은 test_<메서드 이름> 이라는 이름으로 각 테스트에 대한 메서드를 만드는 것입니다. 이 사례의 경우 테스트는 다음과 같습니다.

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    def test_add_rider_age(session):
        ...
    
    def test_calc_bike_facts(session):
        ...
    
    
    def test_calc_month_facts(session):
        ...
    
    Copy

    각 테스트 사례의 session 매개 변수는 이전 섹션에서 생성한 PyTest 픽스쳐를 참조합니다.

  4. 이제 각 변환기의 테스트 사례를 구현합니다. 다음 패턴을 사용합니다.

    1. 입력 DataFrame을 생성합니다.

    2. 예상 출력 DataFrame을 생성합니다.

    3. 1단계의 입력 DataFrame을 변환기 메서드에 전달합니다.

    4. 3단계의 출력을 2단계의 예상 출력과 비교합니다.

    # test/test_transformers.py
    from project.transformers import add_rider_age, calc_bike_facts, calc_month_facts
    from snowflake.snowpark.types import StructType, StructField, IntegerType, FloatType
    
    def test_add_rider_age(session: Session):
        input = session.create_dataframe(
            [
                [1980],
                [1995],
                [2000]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType())])
        )
    
        expected = session.create_dataframe(
            [
                [1980, 43],
                [1995, 28],
                [2000, 23]
            ],
            schema=StructType([StructField("BIRTH_YEAR", IntegerType()), StructField("RIDER_AGE", IntegerType())])
        )
    
        actual = add_rider_age(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_bike_facts(session: Session):
        input = session.create_dataframe([
                [1, 10, 20],
                [1, 5, 30],
                [2, 20, 50],
                [2, 10, 60]
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("TRIPDURATION", IntegerType()),
                StructField("RIDER_AGE", IntegerType())
            ])
        )
    
        expected = session.create_dataframe([
                [1, 2, 7.5, 25.0],
                [2, 2, 15.0, 55.0],
            ],
            schema=StructType([
                StructField("BIKEID", IntegerType()),
                StructField("COUNT", IntegerType()),
                StructField("AVG_TRIPDURATION", FloatType()),
                StructField("AVG_RIDER_AGE", FloatType())
            ])
        )
    
        actual = calc_bike_facts(input)
        assert expected.collect() == actual.collect()
    
    
    def test_calc_month_facts(session: Session):
        from patches import patch_to_timestamp
    
        input = session.create_dataframe(
            data=[
                ['2018-03-01 09:47:00.000 +0000', 1, 10,  15],
                ['2018-03-01 09:47:14.000 +0000', 2, 20, 12],
                ['2018-04-01 09:47:04.000 +0000', 3, 6,  30]
            ],
            schema=['STARTTIME', 'BIKE_ID', 'TRIPDURATION', 'RIDER_AGE']
        )
    
        expected = session.create_dataframe(
            data=[
                ['Mar', 2, 15, 13.5],
                ['Apr', 1, 6, 30.0]
            ],
            schema=['MONTH', 'COUNT', 'AVG_TRIPDURATION', 'AVG_RIDER_AGE']
        )
    
        actual = calc_month_facts(input)
    
        assert expected.collect() == actual.collect()
    
    Copy
  5. 이제 PyTest를 실행하여 모든 단위 테스트를 실행할 수 있습니다.

    pytest test/test_transformers.py
    
    Copy

저장 프로시저의 통합 테스트 추가하기

이제 DataFrame 변환기 메서드의 단위 테스트가 있으므로 저장 프로시저의 통합 테스트를 추가해 보겠습니다. 이 테스트 사례에서는 다음 패턴을 따릅니다.

  1. 저장 프로시저에 대한 입력 데이터를 나타내는 테이블을 만듭니다.

  2. 저장 프로시저의 두 출력 테이블에 있어야 하는 내용으로 DataFrames를 2개 만듭니다.

  3. 저장 프로시저를 호출합니다.

  4. 실제 출력 테이블을 2단계의 DataFrames와 비교합니다.

  5. 정리: 1단계의 입력 테이블과 3단계의 출력 테이블을 삭제합니다.

test 디렉터리에 test_sproc.py 라는 Python 파일을 만듭니다.

프로젝트 디렉터리에서 저장 프로시저 처리기를 가져오고 테스트 사례를 만듭니다.

# test/test_sproc.py
from project.sproc import create_fact_tables

def test_create_fact_tables(session):
    ...
Copy

입력 테이블 생성부터 시작하여 테스트 사례를 구현합니다.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')
Copy

다음으로, 예상 출력 테이블에 대해 DataFrames를 생성합니다.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()
Copy

마지막으로, 저장 프로시저를 호출하고 출력 테이블을 읽습니다. 실제 테이블을 DataFrame 콘텐츠와 비교합니다.

# test/test_sproc.py
from project.sproc import create_fact_tables
from snowflake.snowpark.types import *

def test_create_fact_tables(session):
    DB = 'CITIBIKE'
    SCHEMA = 'TEST'

    # Set up source table
    tbl = session.create_dataframe(
        data=[
            [1983, '2018-03-01 09:47:00.000 +0000', 551, 30958],
            [1988, '2018-03-01 09:47:01.000 +0000', 242, 19278],
            [1992, '2018-03-01 09:47:01.000 +0000', 768, 18461],
            [1980, '2018-03-01 09:47:03.000 +0000', 690, 15533],
            [1991, '2018-03-01 09:47:03.000 +0000', 490, 32449],
            [1959, '2018-03-01 09:47:04.000 +0000', 457, 29411],
            [1971, '2018-03-01 09:47:08.000 +0000', 279, 28015],
            [1964, '2018-03-01 09:47:09.000 +0000', 546, 15148],
            [1983, '2018-03-01 09:47:11.000 +0000', 358, 16967],
            [1985, '2018-03-01 09:47:12.000 +0000', 848, 20644],
            [1984, '2018-03-01 09:47:14.000 +0000', 295, 16365]
        ],
        schema=['BIRTH_YEAR', 'STARTTIME', 'TRIPDURATION',    'BIKEID'],
    )

    tbl.write.mode('overwrite').save_as_table([DB, SCHEMA, 'TRIPS_TEST'], mode='overwrite')

    # Expected values
    n_rows_expected = 12
    bike_facts_expected = session.create_dataframe(
        data=[
            [30958, 1, 551.0, 40.0],
            [19278, 1, 242.0, 35.0],
            [18461, 1, 768.0, 31.0],
            [15533, 1, 690.0, 43.0],
            [32449, 1, 490.0, 32.0],
            [29411, 1, 457.0, 64.0],
            [28015, 1, 279.0, 52.0],
            [15148, 1, 546.0, 59.0],
            [16967, 1, 358.0, 40.0],
            [20644, 1, 848.0, 38.0],
            [16365, 1, 295.0, 39.0]
        ],
        schema=StructType([
            StructField("BIKEID", IntegerType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", FloatType()),
            StructField("AVG_RIDER_AGE", FloatType())
        ])
    ).collect()

    month_facts_expected = session.create_dataframe(
        data=[['Mar', 11, 502.18182, 43.00000]],
        schema=StructType([
            StructField("MONTH", StringType()),
            StructField("COUNT", IntegerType()),
            StructField("AVG_TRIPDURATION", DecimalType()),
            StructField("AVG_RIDER_AGE", DecimalType())
        ])
    ).collect()

    # Call sproc, get actual values
    n_rows_actual = create_fact_tables(session, 'TRIPS_TEST')
    bike_facts_actual = session.table([DB, SCHEMA, 'bike_facts']).collect()
    month_facts_actual = session.table([DB, SCHEMA, 'month_facts']).collect()

    # Comparisons
    assert n_rows_expected == n_rows_actual
    assert bike_facts_expected == bike_facts_actual
    assert month_facts_expected ==  month_facts_actual
Copy

테스트 사례를 실행하려면 터미널에서 pytest 를 실행하십시오.

pytest test/test_sproc.py
Copy

프로젝트의 모든 테스트를 실행하려면 다른 옵션 없이 pytest 를 실행하십시오.

pytest
Copy

로컬 테스트 구성하기

이제 DataFrame 변환기 및 저장 프로시저에 대한 PyTest 테스트 모음이 생겼습니다. 각 테스트 사례에서 Session 픽스쳐는 Snowflake 계정에 연결하고 Snowpark Python API에서 SQL을 전송하고 응답을 검색하는 데 사용됩니다.

또는 로컬 테스트 프레임워크를 사용하여 Snowflake에 연결하지 않고 로컬에서 변환을 실행할 수 있습니다. 대규모 테스트 모음에서 이를 통해 테스트 실행 속도가 훨씬 더 빨라질 수 있습니다. 이 섹션에서는 로컬 테스트 프레임워크 기능을 사용하도록 테스트 모음을 업데이트하는 방법을 보여줍니다.

  1. 먼저 PyTest Session 픽스쳐를 업데이트합니다. 로컬 테스트 모드와 라이브 테스트 모드 간을 전환하기 위해 명령줄 옵션을 PyTest에 추가하겠습니다.

    # test/conftest.py
    
    import pytest
    from project.utils import get_env_var_config
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.configs('local_testing', True).create()
        else:
            return Session.builder.configs(get_env_var_config()).create()
    
    Copy
  2. 모든 기본 제공 함수가 로컬 테스트 프레임워크에서 지원되는 것은 아니므로 먼저 이 메서드를 패치해야 합니다(예: calc_month_facts() 변환기에 사용되는 monthname() 함수). 테스트 디렉터리 아래에 patches.py 라는 파일을 만듭니다. 이 파일에 다음 코드를 붙여넣습니다.

    from snowflake.snowpark.mock.functions import patch
    from snowflake.snowpark.functions import monthname
    from snowflake.snowpark.mock.snowflake_data_type import ColumnEmulator, ColumnType
    from snowflake.snowpark.types import StringType
    import datetime
    import calendar
    
    @patch(monthname)
    def patch_monthname(column: ColumnEmulator) -> ColumnEmulator:
        ret_column = ColumnEmulator(data=[
            calendar.month_abbr[datetime.datetime.strptime(row, '%Y-%m-%d %H:%M:%S.%f %z').month]
            for row in column])
        ret_column.sf_type = ColumnType(StringType(), True)
        return ret_column
    
    Copy

    위의 패치는 열 내에 데이터 행을 포함하는 pandas.Series 와 유사한 오브젝트인 단일 매개 변수 column 을 허용합니다. 그런 다음 Python 모듈 datetimecalendar 의 메서드 조합을 사용하여 기본 제공되는 monthname() 열의 기능을 에뮬레이트합니다. 마지막으로, 기본 제공 메서드가 월(《Jan》, 《Feb》, 《Mar》 등)에 해당하는 문자열을 반환하므로 반환 유형을 String 으로 설정합니다.

  3. 다음으로, 이 메서드를 DataFrame 변환기 및 저장 프로시저에 대한 테스트로 가져옵니다.

    # test/test_transformers.py
    
    # No changes to the other unit test methods
    
    def test_calc_month_facts(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes
    
    Copy
  4. 로컬 플래그로 pytest 를 다시 실행합니다.

    pytest test/test_transformers.py --snowflake-session local
    
    Copy
  5. 이제 저장 프로시저 테스트에 동일한 패치를 적용합니다.

    #test/test_sproc.py
    
    def test_create_fact_tables(request, session):
        # Add conditional to include the patch if local testing is being used
        if request.config.getoption('--snowflake-session') == 'local':
            from patches import patch_monthname
    
        # No other changes required
    
    Copy
  6. 로컬 플래그로 pytest를 다시 실행합니다.

    pytest test/test_sproc.py --snowflake-session local
    
    Copy
  7. 마무리하기 위해 로컬에서 전체 테스트 모음의 실행 소요 시간과 라이브 연결 시의 소요 시간을 비교해 보겠습니다. time 명령을 사용하여 두 명령에 걸린 시간을 측정하겠습니다. 라이브 연결부터 시작해 보겠습니다.

    time pytest
    
    Copy

    이 경우 테스트 도구를 실행하는 데 7.89초가 걸렸습니다. (정확한 시간은 컴퓨터, 네트워크 연결 및 기타 요인에 따라 다를 수 있습니다.)

    =================================== test session starts ==========================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 6.86s =================================
    pytest  1.63s user 1.86s system 44% cpu 7.893 total
    

    이제 로컬 테스트 프레임워크로 실행해 보겠습니다.

    time pytest --snowflake-session local
    
    Copy

    로컬 테스트 프레임워크인 테스트 모음을 사용하면 실행 시간이 1초밖에 걸리지 않았습니다!

    ================================== test session starts ================================
    platform darwin -- Python 3.9.18, pytest-7.4.3, pluggy-1.3.0
    rootdir: /Users/jfreeberg/Desktop/snowpark-testing-tutorial
    configfile: pytest.ini
    collected 4 items
    
    test/test_sproc.py .                                                             [ 25%]
    test/test_transformers.py ...                                                    [100%]
    
    =================================== 4 passed in 0.10s ==================================
    pytest --snowflake-session local  1.37s user 1.70s system 281% cpu 1.093 total
    

자세히 알아보기

끝났습니다! 잘하셨습니다.

이 자습서에서는 Python Snowpark 코드를 테스트하는 방법을 전체적으로 살펴보았습니다. 그 과정에서 다음 작업을 수행했습니다.