Snowpark Python용 테스트 작성하기

이 항목에서는 Snowflake에 연결된 동안 Snowpark 코드를 테스트하는 방법을 설명합니다. PyTest와 같은 표준 테스트 유틸리티를 사용하여 Snowpark Python UDF, DataFrame 변환, 저장 프로시저를 테스트할 수 있습니다.

이 항목의 내용:

철저한 테스트를 통해 의도치 않게 중대한 사항이 변경되지 않도록 방지할 수 있습니다. 단위 테스트에서는 코드 섹션이 예상대로 작동하는지 확인합니다. 통합 테스트는 포괄적인 사용 사례에 대해 구성 요소들이 함께 올바르게 작동하는지 확인하는 데 도움이 됩니다.

이 문서의 예제에서는 Python용으로 가장 널리 사용되는 테스트 프레임워크 중 하나인 PyTest를 사용합니다. 추가 안내 및 모범 사례는 PyTest 설명서 를 참조하십시오.

또는 Snowpark Python 로컬 테스트 프레임워크를 사용하면 Snowflake 계정에 연결하지 않고도 Snowpark Python DataFrames에서 로컬로 생성하고 작동할 수 있습니다. 자세한 내용은 로컬 테스트 프레임워크 섹션을 참조하십시오.

테스트 설정하기

pip install pytest 또는 conda install pytest 를 실행하여 프로젝트에 PyTest를 설치합니다. requirements.txt 또는 conda 환경 파일에 추가할 수도 있습니다.

소스 코드 디렉터리 옆에 test 디렉터리를 만들고 이 디렉터리에 단위 및 통합 테스트를 추가합니다. 예시를 보려면 Snowpark Python 프로젝트 템플릿 을 참조하십시오.

Snowpark 세션을 위한 PyTest 픽스쳐 만들기

PyTest 픽스쳐는 테스트에 데이터나 연결을 제공하기 위해 테스트(또는 테스트 모듈) 전에 실행되는 함수입니다. 이 시나리오에서는 Snowpark Session 오브젝트를 반환하는 PyTest 픽스쳐를 만듭니다.

  1. test 디렉터리가 아직 없다면 하나 만듭니다.

  2. 다음 내용으로 test 에서 conftest.py 를 만듭니다. 여기서 connection_parameters 는 Snowflake 계정 자격 증명이 포함된 사전입니다. 사전 형식에 대한 자세한 내용은 세션 만들기 섹션을 참조하십시오.

  3. 여러 세션이 생성되고 세션 오브젝트 충돌로 인해 문제가 발생하는 것을 방지하려면 파일 범위의 픽스쳐 대신 모듈 범위의 픽스쳐로 Session 픽스쳐를 생성합니다.

from snowflake.snowpark.session import Session

@pytest.fixture(scope='module')
def session(request) -> Session:
connection_parameters = {}
return Session.builder.configs(...).create()
Copy

UDF의 단위 테스트

UDF 처리기를 일반 Python 메서드로 테스트하여 Python UDF 논리를 테스트할 수 있습니다.

  1. UDF 단위 테스트를 위해 test 디렉터리 아래에 파일을 만듭니다. 예를 들어 파일 이름을 test_functions.py 로 지정합니다.

  2. 테스트할 Python 메서드를 가져옵니다.

  3. 각 테스트 시나리오에 대해 test_<scenario_to_test> 라는 Python 메서드를 만듭니다.

예를 들어 다음은 Python UDF 처리기입니다.

def fahrenheit_to_celsius(temp_f: float) -> float:
    """
    Converts fahrenheit to celsius
    """
    return (float(temp_f) - 32) * (5/9)
Copy

이 메서드를 테스트 파일(test/test_functions.py)로 가져와 일반 Python 메서드로 테스트할 수 있습니다.

import farenheit_to_celsius

def test_farenheit_to_celsius():
    expected = 0.0
    actual = farenheit_to_celsius(32)
    assert expected == actual
Copy

DataFrame 변환의 단위 테스트

DataFrame 변환에 대한 단위 테스트를 추가하면 예상치 못한 버그와 회귀로부터 보호하는 데 도움이 됩니다. DataFrame 논리를 쉽게 테스트할 수 있도록 하려면 변환할 DataFrames를 입력으로 사용하고 변환된 DataFrames를 반환하는 Python 메서드로 변환을 캡슐화하십시오.

아래 예제에서 mf_df_transformer 에는 변환 논리가 포함되어 있습니다. Python 프로젝트의 다른 모듈로 가져와서 쉽게 테스트할 수 있습니다.

from snowflake.snowpark.dataframe import DataFrame, col

def my_df_tranformer(df: DataFrame) -> DataFrame:
    return df \
        .with_column('c', df['a']+df['b']) \
        .filter(col('c') > 3)
Copy

이 변환을 테스트하려면 다음 단계를 따르십시오.

  1. test 디렉터리(test/test_transformers.py) 아래에 DataFrame 테스트용 파일 test_transformers.py 를 만듭니다.

  2. 테스트할 변환기의 테스트 메서드 test_my_df_transformer(session) 을 만듭니다. 여기서 session 매개 변수는 이전 섹션에서 생성된 세션 픽스쳐를 가리킵니다.

  3. 세션 픽스쳐를 사용하여 테스트 메서드 내에서 입력 및 예상 출력 DataFrames를 생성합니다.

  4. 입력 DataFrame을 변환기에 전달하고 예상된 DataFrame을 변환기에서 반환된 실제 DataFrame과 비교합니다.

# test/test_transformers.py

import my_df_transformer

def test_my_df_transformer(session):
input_df = session.create_dataframe([[1,2],[3,4]], ['a', 'b'])
expected_df = session.create_dataframe([3,4,7], ['a','b','c'])
actual_df = my_df_transformer(input_df)
assert input_df.collect() == actual_df.collect()
Copy

저장 프로시저의 통합 테스트

저장 프로시저 처리기를 테스트하려면 세션 픽스쳐를 사용하여 저장 프로시저 처리기를 호출하십시오. 저장 프로시저가 테이블(예: ETL 파이프라인의 테이블)에서 읽는 경우 아래 예와 같이 저장 프로시저 처리기를 호출하기 전에 해당 테이블을 생성할 수 있습니다. 이 패턴을 사용하면 입력 데이터가 원본 제어에서 추적되고 테스트 실행 간에 예기치 않게 변경되지 않습니다.

from project import my_sproc_handler  # import stored proc handler

def test_my_sproc_handler(session: Session):

    # Create input table
    input_tbl = session.create_dataframe(
        data=[...],
        schema=[...],
    )

    input_tbl.write.mode('overwrite').save_as_table(['DB', 'SCHEMA', 'INPUT_TBL'], mode='overwrite')

    # Create expected output dataframe
    expected_df = session.create_dataframe(
        data=[...],
        schema=[...],
    ).collect()

    # Call the stored procedure
    my_sproc_handler()

    # Get actual table
    actual_tbl = session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).collect()

    # Clean up tables
    session.table(['DB', 'SCHEMA', 'OUTPUT_TBL']).delete()
    session.table(['DB', 'SCHEMA', 'INPUT_TBL']).delete()

    # Compare the actual and expected tables
    assert expected_df == actual_tbl
Copy