로컬 테스트 프레임워크

이 항목에서는 Snowpark Python 라이브러리로 작업할 때 로컬에서 코드를 테스트하는 방법을 설명합니다.

이 항목의 내용:

Snowpark Python 로컬 테스트 프레임워크는 Snowflake 계정에 연결하지 않고도 로컬에서 Snowpark Python DataFrames를 생성하고 작동할 수 있는 에뮬레이터입니다. 코드 변경 사항을 계정에 배포하기 전에 테스트 프레임워크를 사용하여 개발 시스템 또는 CI(지속적 통합) 파이프라인에서 로컬로 DataFrame 작업을 테스트할 수 있습니다. API는 동일하므로 코드를 변경하지 않고도 로컬 또는 Snowflake 계정에 대해 테스트를 실행할 수 있습니다.

전제 조건

로컬 테스트 프레임워크를 사용하려면 다음을 수행하십시오.

  • 선택적 종속성 localtest 와 함께 버전 1.18.0 이상의 Snowpark Python 라이브러리를 사용해야 합니다.

  • 지원되는 Python 버전은 다음과 같습니다.

    • 3.9

    • 3.10

    • 3.11

    • 3.12

Snowpark Python 라이브러리 설치

  • 선택적 종속성을 포함하여 라이브러리를 설치하려면 다음 명령을 실행합니다.

    pip install "snowflake-snowpark-python[localtest]"
    
    Copy

세션을 생성하고 로컬 테스트를 활성화합니다

  1. Snowpark Session 을 생성하고 로컬 테스트 구성을 True 로 설정합니다.

    from snowflake.snowpark import Session
    
    session = Session.builder.config('local_testing', True).create()
    
    Copy
  2. 세션을 사용하여 DataFrames에서 만들고 작업을 수행합니다.

    df = session.create_dataframe([[1,2],[3,4]],['a','b'])
    df.with_column('c', df['a']+df['b']).show()
    
    Copy

데이터 로딩하기

Python primitive, 파일, Pandas DataFrames에서 Snowpark DataFrames를 생성할 수 있습니다. 이는 테스트 사례의 입력 및 예상 출력을 지정하는 데 유용합니다. 이 메서드를 사용하면 데이터가 소스 제어에 있으므로 테스트 데이터를 테스트 케이스와 동기화하기가 더 쉬워집니다.

CSV 데이터 로드

  • CSV 파일을 Snowpark DataFrame에 로드하려면 먼저 Session.file.put() 을 호출하여 파일을 인메모리 스테이지로 로드한 다음 Session.read() 를 사용하여 내용을 읽습니다.

다음과 같은 내용의 data.csv 파일이 있다고 가정해 보겠습니다.

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

다음 코드를 사용하여 data.csv 를 Snowpark DataFrame에 로드할 수 있습니다. 먼저 파일을 스테이지에 올려야 합니다. 그렇지 않으면 “file cannot be found” 오류가 발생합니다.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

예상 출력:

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Load pandas data

  • pandas DataFrame에서 Snowpark Python DataFrame을 생성하려면 create_dataframe 메서드를 호출하고 데이터를 pandas DataFrame으로 전달합니다.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

예상 출력:

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------
  • pandas DataFrame에서 Snowpark Python DataFrame을 생성하려면 to_pandas 메서드를 호출하고 데이터를 pandas DataFrame으로 전달합니다.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

예상 출력:

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

세션에 대한 PyTest 픽스쳐 만들기

PyTest 픽스쳐 는 테스트(또는 테스트 모듈) 전에 실행되는 함수로, 일반적으로 테스트에 데이터나 연결을 제공합니다. 이 프로시저에서는 Snowpark Session 오브젝트를 반환하는 픽스처를 생성합니다.

  1. test 디렉터리가 아직 없는 경우 디렉터리를 생성합니다.

  2. 그런 다음 test 디렉터리에서 다음 내용으로 conftest.py 파일을 생성합니다. 여기서 connection_parameters 는 Snowflake 계정 자격 증명이 포함된 사전입니다.

    # test/conftest.py
    import pytest
    from snowflake.snowpark.session import Session
    
    def pytest_addoption(parser):
        parser.addoption("--snowflake-session", action="store", default="live")
    
    @pytest.fixture(scope='module')
    def session(request) -> Session:
        if request.config.getoption('--snowflake-session') == 'local':
            return Session.builder.config('local_testing', True).create()
        else:
            return Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    Copy

사전 형식에 대한 자세한 내용은 세션 만들기 섹션을 참조하십시오.

pytest_addoption 을 호출하면 snowflake-session 이라는 명령줄 옵션이 pytest 명령에 추가됩니다. Session 픽스쳐는 이 명령줄 옵션을 확인하고 해당 값에 따라 로컬 또는 라이브 Session 을 생성합니다. 다음 명령줄 예제에서 보듯이, 이를 통해 테스트를 위해 로컬 모드와 라이브 모드 사이를 쉽게 변환할 수 있습니다.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL 작업

Session.sql(...) 은 로컬 테스트 프레임워크에서는 지원되지 않습니다. 가능하면 항상 Snowpark의 DataFrame APIs를 사용하고, Session.sql(...) 을 사용해야 하는 경우 Python의 unittest.mock.patch 를 사용하여 테이블 형식 반환 값을 모의하여 주어진 Session.sql() 호출에서 예상되는 응답을 패치할 수 있습니다.

아래 예제에서 mock_sql() 은 SQL 쿼리 텍스트를 원하는 DataFrame 응답에 매핑합니다. 다음 조건문은 현재 세션이 로컬 테스트를 사용 중인지 확인하고, 그렇다면 Session.sql() 메서드에 패치를 적용합니다.

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

로컬 테스트가 활성화되면 DataFrame.save_as_table() 에서 생성된 모든 테이블이 인메모리 상태로 임시 테이블로 저장되며 Session.table() 을 사용하여 검색할 수 있습니다. 평소와 같이 테이블에서 지원되는 DataFrame 작업을 사용할 수 있습니다.

기본 제공 함수 패치 적용하기

snowflake.snowpark.functions 아래의 기본 제공 함수가 모두 로컬 테스트 프레임워크에서 지원되는 것은 아닙니다. 지원되지 않는 함수를 사용하는 경우 snowflake.snowpark.mock@patch 데코레이터를 사용하여 패치를 생성해야 합니다.

패치된 함수를 정의하고 구현하려면 서명(매개 변수 목록)이 기본 제공 함수의 매개 변수와 일치해야 합니다. 로컬 테스트 프레임워크는 다음 규칙을 사용하여 패치된 함수에 매개 변수를 전달합니다.

  • 기본 제공 함수의 시그니처에 있는 ColumnOrName 유형 매개 변수의 경우 ColumnEmulator 가 패치된 함수의 매개 변수로 전달됩니다. ColumnEmulator 는 열 데이터가 포함된 pandas.Series 오브젝트와 유사합니다.

  • 기본 제공 함수의 시그니처에 있는 LiteralType 유형 매개 변수의 경우 리터럴 값이 패치된 함수의 매개 변수로 전달됩니다.

  • 그렇지 않으면 원시 값이 패치된 함수의 매개 변수로 전달됩니다.

패치된 함수의 반환 유형에 관해서는 기본 제공 함수의 Column 반환 유형에 맞춰 ColumnEmulator 인스턴스가 반환될 것으로 예상됩니다.

예를 들어 기본 제공 함수 to_timestamp() 는 다음과 같이 패치될 수 있습니다.

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

테스트 사례 건너뛰기

PyTest 테스트 모음에 로컬 테스트에서 제대로 지원되지 않는 테스트 사례가 포함된 경우 PyTest의 mark.skipif 데코레이터를 사용하여 해당 사례를 건너뛸 수 있습니다. 아래 예제에서는 앞서 설명한 대로 세션과 매개 변수를 구성했다고 가정합니다. 이 조건은 local_testing_modelocal 로 설정되어 있는지 확인하고, 설정되어 있으면 설명 메시지와 함께 테스트 케이스를 건너뜁니다.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

UDFs 및 저장 프로시저 등록하기

로컬 테스트 프레임워크에서 사용자 정의 함수(UDFs)와 저장 프로시저를 생성하고 호출할 수 있습니다. 오브젝트를 생성하려면 다음 구문 옵션을 사용할 수 있습니다.

구문

UDF

저장 프로시저

데코레이터

@udf

@sproc

등록 메서드

udf.register()

sproc.register()

파일에서 등록 메서드

udf.register_from_file()

sproc.register_from_file()

다음 코드 예제는 데코레이터를 사용하여 UDF 및 저장 프로시저를 생성한 다음 두 프로시저를 이름으로 호출합니다.

from snowflake.snowpark.session import Session
from snowflake.snowpark.dataframe import col, DataFrame
from snowflake.snowpark.functions import udf, sproc, call_udf
from snowflake.snowpark.types import IntegerType, StringType

# Create local session
session = Session.builder.config('local_testing', True).create()

# Create local table
table = 'example'
session.create_dataframe([[1,2],[3,4]],['a','b']).write.save_as_table(table)

# Register a UDF, which is called from the stored procedure
@udf(name='example_udf', return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
def example_udf(a, b):
    return a + b

# Register stored procedure
@sproc(name='example_proc', return_type=IntegerType(), input_types=[StringType()])
def example_proc(session, table_name):
    return session.table(table_name)\
        .with_column('c', call_udf('example_udf', col('a'), col('b')))\
        .count()

# Call the stored procedure by name
output = session.call('example_proc', table)

print(output)
Copy

제한 사항

다음 목록에는 로컬 테스트 프레임워크의 알려진 제한 사항과 동작 격차가 포함되어 있습니다. Snowflake는 현재 이러한 항목을 해결할 계획이 없습니다.

  • 원시 SQL 문자열과 SQL 문자열을 구문 분석해야 하는 작업(예: session.sqlDataFrame.filter("col1 > 12"))은 지원되지 않습니다.

  • 비동기 작업은 지원되지 않습니다.

  • 테이블, 저장 프로시저, UDFs와 같은 데이터베이스 오브젝트는 세션 수준 이상으로 유지되지 않으며 모든 작업은 메모리에서 수행됩니다. 예를 들어, 한 모의 세션에 등록된 영구 저장 프로시저는 다른 모의 세션에 표시되지 않습니다.

  • Column.collate 와 같은 문자열 데이터 정렬 관련 기능은 지원되지 않습니다.

  • Variant, ArrayObject 데이터 타입은 표준 JSON 인코딩 및 디코딩에서만 지원됩니다. [1,2,,3,] 등의 식은 Snowflake에서는 유효한 JSON으로 간주되지만, Python의 기본 제공 JSON 기능이 사용되는 로컬 테스트에서는 그렇지 않습니다. 모듈 수준 변수 snowflake.snowpark.mock.CUSTOM_JSON_ENCODERsnowflake.snowpark.mock.CUSTOM_JSON_DECODER 를 지정하여 기본 설정을 재정의할 수 있습니다.

  • Snowflake 함수(윈도우 함수 포함) 중 일부만 구현됩니다. 자신만의 함수 정의를 삽입하는 방법을 알아보려면 기본 제공 함수 패치 적용하기 섹션을 참조하십시오.

    • 현재 순위 관련 함수 가져오기는 지원되지 않습니다.

  • SQL 형식 모델 는 지원되지 않습니다. 예를 들어, to_decimal 의 모의 구현은 선택적 매개 변수 format 을 처리하지 않습니다.

  • Snowpark Python 라이브러리에는 스테이지를 생성하거나 드롭하는 Python API가 기본으로 포함되어 않으므로 로컬 테스트 프레임워크는 수신되는 모든 스테이지가 이미 생성되어 있다고 가정합니다.

  • 현재 구현된 UDFs 및 저장 프로시저는 패키지 유효성 검사를 수행하지 않습니다. 프로그램을 실행하기 전에 코드에서 참조되는 모든 패키지를 설치해야 합니다.

  • 쿼리 태그는 지원되지 않습니다.

  • 쿼리 기록은 지원되지 않습니다.

  • 계보는 지원되지 않습니다.

  • UDF 또는 저장 프로시저가 등록되면 parallel, execute_as, statement_params, source_code_display, external_access_integrations, secrets, comment 등의 선택적 매개 변수는 무시됩니다.

  • Table.sample 의 경우, SYSTEM 또는 BLOCK 샘플링은 ROW 샘플링과 동일합니다.

  • Snowflake는 공식적으로 저장 프로시저 내에서 로컬 테스트 프레임워크를 실행하는 기능을 지원하지 않습니다. 저장 프로시저 내의 로컬 테스트 모드 세션에서 예기치 않은 오류가 발생하거나 발생할 수 있습니다.

지원되지 않는 기능

다음은 현재 로컬 테스트 프레임워크에서 구현되지 않은 기능 목록입니다. Snowflake는 이러한 문제를 해결하기 위해 적극적으로 노력하고 있습니다.

일반적으로 이러한 함수에 대한 참조로 인해 NotImplementedError 가 발생해야 합니다.

  • UDTFs(사용자 정의 테이블 함수)

  • UDAFs(사용자 정의 집계 함수)

  • 벡터화된 UDFs 및 UDTFs

  • 기본 제공 테이블 함수

  • 테이블 저장 프로시저

  • Geometry, GeographyVector 데이터 타입

  • 간격 식

  • JSON 및 CSV 이외의 파일 형식 읽기

    • 지원되는 파일 형식의 경우 일부 읽기 옵션이 지원되지 않습니다. 예를 들어, infer_schema 는 CSV 형식에서는 지원되지 않습니다.

여기에 지원되지 않거나 알려진 제한 사항으로 나열되지 않은 기능은 로컬 테스트를 위한 최신 기능 요청 목록 을 확인하거나 snowpark-python GitHub 리포지토리에서 기능 요청을 생성 하십시오.

알려진 문제

다음은 로컬 테스트 프레임워크에 존재하는 알려진 문제 또는 동작 차이의 목록입니다. Snowflake는 이러한 문제를 적극적으로 해결하기 위해 계획하고 있습니다.

  • DataFrame.groupby 또는 기타 집계 작업 내에서 윈도우 함수를 사용하는 기능은 지원되지 않습니다.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy
  • 동일한 이름의 열을 선택하면 하나의 열만 반환됩니다. 해결 방법으로, Column.alias 를 사용하여 고유한 이름을 갖도록 열 이름을 바꾸십시오.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Table.mergeTable.update 의 경우 세션 매개 변수 ERROR_ON_NONDETERMINISTIC_UPDATEERROR_ON_NONDETERMINISTIC_MERGEFalse 로 설정해야 합니다. 이는 다중 조인의 경우 일치하는 행 중 1개가 업데이트됨을 의미합니다.

  • 정규화된 스테이지 이름은 GET 및 PUT 파일 작업에서 지원되지 않습니다. 데이터베이스와 스키마 이름은 스테이지 이름의 일부로 처리됩니다.

  • mock_to_char 구현은 서로 다른 시간 부분 사이에 구분 기호가 있는 형식의 타임스탬프만 지원합니다.

  • DataFrame.pivot 에는 피벗을 특정 값으로 제한할 수 있는 values 매개 변수가 있습니다. 현재는 통계적으로 정의된 값만 사용할 수 있습니다. 하위 쿼리를 사용하여 제공된 값을 사용하면 오류가 발생합니다.

  • 시간대 정보가 포함된 타임스탬프가 포함된 pandas DataFrame 에서 DataFrame 을 생성하는 기능은 지원되지 않습니다.

이 목록에 언급되지 않은 문제에 대해서는 최신 미해결 문제 목록 을 확인하거나 snowpark-python GitHub 리포지토리에서 버그 리포트를 작성 하십시오.