로컬 테스트 프레임워크

이 항목에서는 Snowpark 라이브러리로 작업할 때 로컬에서 코드를 테스트하는 방법을 설명합니다.

이 항목의 내용:

Snowpark Python 로컬 테스트 프레임워크를 사용하면 Snowflake 계정에 연결하지 않고도 Snowpark Python DataFrames에서 로컬로 생성하고 작동할 수 있습니다. 코드 변경 사항을 계정에 배포하기 전에 로컬 테스트 프레임워크를 사용하여 개발 시스템 또는 CI(지속적 통합) 파이프라인에서 로컬로 DataFrame 작업을 테스트할 수 있습니다. API는 동일하므로 코드를 변경하지 않고도 로컬에서 테스트를 실행하거나 Snowflake 계정에 대한 테스트를 실행할 수 있습니다.

전제 조건

로컬 테스트 프레임워크를 사용하려면 다음을 수행하십시오.

  • 선택적 종속성 pandas 와 함께 Snowpark Python 라이브러리 버전 1.11.1 이상을 사용해야 합니다. pip install "snowflake-snowpark-python[pandas]" 를 실행하여 설치하기

  • 지원되는 Python 버전은 다음과 같습니다.

    • 3.8

    • 3.9

    • 3.10

    • 3.11

세션 만들기 및 로컬 테스트 활성화하기

시작하려면 Snowpark Session 을 만들고 로컬 테스트 구성을 True 로 설정하십시오.

from snowflake.snowpark import Session

session = Session.builder.config('local_testing', True).create()
Copy

세션이 생성되면 세션을 사용하여 DataFrames에서 생성하고 작업할 수 있습니다.

df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
Copy

데이터 로드하기

Python primitive, 파일, Pandas DataFrames에서 Snowpark DataFrames를 생성할 수 있습니다. 이는 테스트 사례의 입력 및 예상 출력을 지정하는 데 유용합니다. 이런 식으로 그 작업을 수행하면 데이터가 원본 제어 상태가 되므로, 테스트 데이터와 테스트 사례의 동기화를 더 쉽게 유지할 수 있습니다.

CSV 데이터 로드하기

먼저 Session.file.put() 을 호출하여 파일을 인메모리 스테이지에 로드한 다음 Session.read() 를 사용하여 그 내용을 읽는 방식으로 CSV 파일을 Snowpark DataFrame에 로드할 수 있습니다. data.csv 파일이 있고 파일 내용이 다음과 같다고 가정합니다.

col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
Copy

다음 코드를 사용하여 data.csv 를 Snowpark DataFrame에 로드할 수 있습니다. 먼저 파일을 스테이지에 배치해야 합니다. 그렇지 않으면 파일을 찾을 수 없다는 오류가 발생합니다.

from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType


# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
    [
        StructField("col1", IntegerType()),
        StructField("col2", StringType()),
        StructField("col3", BooleanType()),
        StructField("col4", DoubleType()),
    ]
)

# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
Copy

dataframe.show() 의 출력은 다음과 같습니다.

-------------------------------------
|"COL1"  |"COL2"  |"COL3"  |"COL4"  |
-------------------------------------
|1       |a       |True    |1.23    |
|2       |b       |False   |4.56    |
-------------------------------------

Pandas 데이터 로드하기

create_dataframe 메서드를 호출하고 데이터를 Pandas DataFrame로 전달하여 Pandas DataFrame에서 Snowpark Python DataFrame을 생성할 수 있습니다.

import pandas as pd

pandas_df = pd.DataFrame(
    data={
        "col1": pd.Series(["value1", "value2"]),
        "col2": pd.Series([1.23, 4.56]),
        "col3": pd.Series([123, 456]),
        "col4": pd.Series([True, False]),
    }
)

dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
Copy

dataframe.show() 는 다음을 출력합니다.

-------------------------------------
|"col1"  |"col2"  |"col3"  |"col4"  |
-------------------------------------
|value1  |1.23    |123     |True    |
|value2  |4.56    |456     |False   |
-------------------------------------

Snowpark Python DataFrame은 DataFrame에서 to_pandas 메서드를 호출하여 Pandas DataFrame로 변환할 수도 있습니다.

from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType

dataframe = session.create_dataframe(
    data=[
        ["value1", 1.23, 123, True],
        ["value2", 4.56, 456, False],
    ],
    schema=StructType([
        StructField("col1", StringType()),
        StructField("col2", DoubleType()),
        StructField("col3", LongType()),
        StructField("col4", BooleanType()),
    ])
)

pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
Copy

print(pandas_dataframe.to_string()) 에 대한 호출은 다음을 출력합니다.

    COL1  COL2  COL3   COL4
0  value1  1.23   123   True
1  value2  4.56   456  False

세션을 위한 PyTest 픽스쳐 만들기

PyTest 픽스쳐 는 테스트(또는 테스트 모듈) 전에 실행되는 함수로, 일반적으로 테스트에 데이터나 연결을 제공합니다. 이 경우 Snowpark Session 오브젝트를 반환하는 픽스쳐를 만듭니다. 먼저 test 디렉터리가 아직 없다면 하나 만드십시오. 그런 다음 test 디렉터리에서 다음 내용으로 conftest.py 파일을 만듭니다. 여기서 connection_parameters 는 Snowflake 계정 자격 증명이 포함된 사전입니다. 사전 형식에 대한 자세한 내용은 세션 만들기 섹션을 참조하십시오.

# test/conftest.py
import pytest
from snowflake.snowpark.session import Session

def pytest_addoption(parser):
    parser.addoption("--snowflake-session", action="store", default="live")

@pytest.fixture(scope='module')
def session(request) -> Session:
    if request.config.getoption('--snowflake-session') == 'local':
        return Session.builder.config('local_testing', True).create()
    else:
        return Session.builder.configs(CONNECTION_PARAMETERS).create()
Copy

pytest_addoption 을 호출하면 snowflake-session 이라는 명령줄 옵션이 pytest 명령에 추가됩니다. Session 픽스쳐는 이 명령줄 옵션을 확인하고 해당 값에 따라 로컬 또는 라이브 Session 을 생성합니다. 이를 통해 테스트를 위해 로컬 모드와 라이브 모드 간에 쉽게 전환할 수 있습니다.

# Using local mode:
pytest --snowflake-session local

# Using live mode
pytest
Copy

SQL 작업

Session.sql(...) 은 로컬 테스트 프레임워크에서는 지원되지 않습니다. 가능하면 항상 Snowpark의 DataFrame API를 사용하고, Session.sql(...) 을 사용해야 하는 경우 Python의 unittest.mock.patch 를 사용하여 테이블 형식 반환 값을 모의하여 주어진 Session.sql() 호출에서 예상되는 응답을 패치할 수 있습니다.

아래 예제에서 mock_sql() 은 SQL 쿼리 텍스트를 원하는 DataFrame 응답에 매핑합니다. 다음 조건문은 현재 세션이 로컬 테스트를 사용 중인지 확인하고, 그렇다면 Session.sql() 메서드에 패치를 적용합니다.

from unittest import mock
from functools import partial

def test_something(pytestconfig, session):

    def mock_sql(session, sql_string):  # patch for SQL operations
        if sql_string == "select 1,2,3":
            return session.create_dataframe([[1,2,3]])
        else:
            raise RuntimeError(f"Unexpected query execution: {sql_string}")

    if pytestconfig.getoption('--snowflake-session') == 'local':
        with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
            assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
    else:
        assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
Copy

로컬 테스트가 활성화되면 DataFrame.save_as_table() 에서 생성된 모든 테이블이 인메모리 상태로 임시 테이블로 저장되며 Session.table() 을 사용하여 검색할 수 있습니다. 평소와 같이 테이블에서 지원되는 DataFrame 작업을 사용할 수 있습니다.

기본 제공 함수 패치 적용

snowflake.snowpark.functions 아래의 기본 제공 함수가 모두 로컬 테스트 프레임워크에서 지원되는 것은 아닙니다. 지원되지 않는 함수를 사용하는 경우 snowflake.snowpark.mock@patch 데코레이터를 사용하여 패치를 생성해야 합니다.

패치된 함수를 정의하고 구현하려면 시그니처(매개 변수 목록)가 기본 제공 함수의 매개 변수와 일치해야 합니다. 로컬 테스트 프레임워크는 다음 규칙을 사용하여 패치된 함수에 매개 변수를 전달합니다.

  • 기본 제공 함수의 시그니처에 있는 ColumnOrName 유형 매개 변수의 경우 ColumnEmulator 가 패치된 함수의 매개 변수로 전달됩니다. ColumnEmulator 는 열 데이터가 포함된 pandas.Series 오브젝트와 유사합니다.

  • 기본 제공 함수의 시그니처에 있는 LiteralType 유형 매개 변수의 경우 리터럴 값이 패치된 함수의 매개 변수로 전달됩니다.

  • 그렇지 않으면 원시 값이 패치된 함수의 매개 변수로 전달됩니다.

패치된 함수의 반환 유형에 관해서는 기본 제공 함수의 Column 반환 유형에 맞춰 ColumnEmulator 인스턴스가 반환될 것으로 예상됩니다.

예를 들어 기본 제공 함수 to_timestamp() 는 다음과 같이 패치될 수 있습니다.

import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType

@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
    ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
    ret_column.sf_type = ColumnType(TimestampType(), True)
    return ret_column
Copy

테스트 사례 건너뛰기

PyTest 테스트 모음에 로컬 테스트에서 제대로 지원되지 않는 테스트 사례가 포함된 경우 PyTest의 mark.skipif 데코레이터를 사용하여 해당 사례를 건너뛸 수 있습니다. 아래 예제에서는 앞서 설명한 대로 세션과 매개 변수를 구성했다고 가정합니다. 조건은 local_testing_modelocal 로 설정되어 있는지 확인하고, 그럴 경우 건너뛴 이유를 설명하는 메시지와 함께 테스트 사례를 건너뜁니다.

import pytest

@pytest.mark.skipif(
    condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
    ...
Copy

제한 사항

다음 기능은 지원되지 않습니다.

  • 원시 SQL 문자열과 SQL 문자열 구문 분석이 필요한 작업. 예를 들어, session.sqlDataFrame.filter("col1 > 12") 는 지원되지 않습니다.

  • UDF, UDTF, 저장 프로시저.

  • 테이블 함수.

  • AsyncJobs.

  • 웨어하우스, 스키마 및 기타 세션 속성 변경과 같은 세션 작업.

  • GeometryGeography 데이터 타입.

  • 윈도우 함수 집계.

    # Selecting window function expressions is supported
    df.select("key", "value", sum_("value").over(), avg("value").over())
    
    # Aggregating window function expressions is NOT supported
    df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
    
    Copy

기타 제한 사항은 다음과 같습니다.

  • Variant, ArrayObject 데이터 타입은 표준 JSON 인코딩 및 디코딩에서만 지원됩니다. [1,2,,3,] 등의 식은 Snowflake에서는 유효한 JSON으로 간주되지만, Python의 기본 제공 JSON 기능이 사용되는 로컬 테스트에서는 그렇지 않습니다. 모듈 수준 변수 snowflake.snowpark.mock.CUSTOM_JSON_ENCODERsnowflake.snowpark.mock.CUSTOM_JSON_DECODER 를 지정하여 기본 설정을 재정의할 수 있습니다.

  • Snowflake 함수(윈도우 함수 포함) 중 일부만 구현됩니다. 자신만의 함수 정의를 삽입하는 방법을 알아보려면 기본 제공 함수 패치 적용 섹션을 참조하십시오.

  • 패치 순위 관련 함수는 현재 지원되지 않습니다.

  • 동일한 이름의 열을 선택하면 하나의 열만 반환됩니다. 해결 방법으로, Column.alias 를 사용하여 고유한 이름을 갖도록 열 이름을 바꾸십시오.

    df.select(lit(1), lit(1)).show() # col("a"), col("a")
    #---------
    #|"'1'"  |
    #---------
    #|1      |
    #|...    |
    #---------
    
    # Workaround: Column.alias
    DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2"))
    # "col1_1", "col1_2"
    
    Copy
  • Column.cast 를 사용하는 명시적 유형 캐스팅에는 형식 문자열이 입력(to_decimal, to_number, to_numeric, to_double, to_date, to_time, to_timestamp)과 출력(to_char, to_varchar, to_binary)에 지원되지 않는다는 제한이 있습니다.

  • VariantType 에 저장된 JSON 문자열은 Datetime 유형으로 변환될 수 없습니다.

  • Table.mergeTable.update 의 경우 구현은 세션 매개 변수 ERROR_ON_NONDETERMINISTIC_UPDATEERROR_ON_NONDETERMINISTIC_MERGEFalse 로 설정된 경우에만 동작을 지원합니다. 이는 다중 조인의 경우 일치하는 행 중 하나를 업데이트한다는 의미입니다.

지원되는 API 목록

Snowpark 세션

Session.createDataFrame

Session.create_dataframe

Session.flatten

Session.range

Session.table

입력/출력

DataFrameReader.csv

DataFrameReader.table

DataFrameWriter.saveAsTable

DataFrameWriter.save_as_table

DataFrame

DataFrame.agg

DataFrame.cache_result

DataFrame.col

DataFrame.collect

DataFrame.collect_nowait

DataFrame.copy_into_table

DataFrame.count

DataFrame.createOrReplaceTempView

DataFrame.createOrReplaceView

DataFrame.create_or_replace_temp_view

DataFrame.create_or_replace_view

DataFrame.crossJoin

DataFrame.cross_join

DataFrame.distinct

DataFrame.drop

DataFrame.dropDuplicates

DataFrame.drop_duplicates

DataFrame.dropna

DataFrame.except_

DataFrame.explain

DataFrame.fillna

DataFrame.filter

DataFrame.first

DataFrame.groupBy

DataFrame.group_by

DataFrame.intersect

DataFrame.join

DataFrame.limit

DataFrame.minus

DataFrame.natural_join

DataFrame.orderBy

DataFrame.order_by

DataFrame.rename

DataFrame.replace

DataFrame.rollup

DataFrame.sample

DataFrame.select

DataFrame.show

DataFrame.sort

DataFrame.subtract

DataFrame.take

DataFrame.toDF

DataFrame.toLocalIterator

DataFrame.toPandas

DataFrame.to_df

DataFrame.to_local_iterator

DataFrame.to_pandas

DataFrame.to_pandas_batches

DataFrame.union

DataFrame.unionAll

DataFrame.unionAllByName

DataFrame.unionByName

DataFrame.union_all

DataFrame.union_all_by_name

DataFrame.union_by_name

DataFrame.unpivot

DataFrame.where

DataFrame.withColumn

DataFrame.withColumnRenamed

DataFrame.with_column

DataFrame.with_column_renamed

DataFrame.with_columns

DataFrameNaFunctions.drop

DataFrameNaFunctions.fill

DataFrameNaFunctions.replace

Column.alias

Column.as_

Column.asc

Column.asc_nulls_first

Column.asc_nulls_last

Column.astype

Column.between

Column.bitand

Column.bitor

Column.bitwiseAnd

Column.bitwiseOR

Column.bitwiseXOR

Column.bitxor

Column.cast

Column.collate

Column.desc

Column.desc_nulls_first

Column.desc_nulls_last

Column.endswith

Column.eqNullSafe

Column.equal_nan

Column.equal_null

Column.getItem

Column.getName

Column.get_name

Column.in_

Column.isNotNull

Column.isNull

Column.is_not_null

Column.is_null

Column.isin

Column.like

Column.name

Column.over

Column.regexp

Column.rlike

Column.startswith

Column.substr

Column.substring

Column.try_cast

Column.within_group

CaseExpr.when

CaseExpr.otherwise

데이터 타입

ArrayType

BinaryType

BooleanType

ByteType

ColumnIdentifier

DataType

DateType

DecimalType

DoubleType

FloatType

IntegerType

LongType

MapType

NullType

ShortType

StringType

StructField

StructType

Timestamp

TimestampType

TimeType

Variant

VariantType

Row.asDict

Row.as_dict

Row.count

Row.index

함수

abs

avg

coalesce

contains

count

count_distinct

covar_pop

endswith

first_value

iff

lag

last_value

lead

list_agg

max

median

min

parse_json

row_number

startswith

substring

sum

to_array

to_binary

to_boolean

to_char

to_date

to_decimal

to_double

to_object

to_time

to_timestamp

to_variant

Window

Window.orderBy

Window.order_by

Window.partitionBy

Window.partition_by

Window.rangeBetween

Window.range_between

Window.rowsBetween

Window.rows_between

WindowSpec.orderBy

WindowSpec.order_by

WindowSpec.partitionBy

WindowSpec.partition_by

WindowSpec.rangeBetween

WindowSpec.range_between

WindowSpec.rowsBetween

WindowSpec.rows_between

그룹화

RelationalGroupedDataFrame.agg

RelationalGroupedDataFrame.apply_in_pandas

RelationalGroupedDataFrame.applyInPandas

RelationalGroupedDataFrame.avg

RelationalGroupedDataFrame.builtin

RelationalGroupedDataFrame.count

RelationalGroupedDataFrame.function

RelationalGroupedDataFrame.max

RelationalGroupedDataFrame.mean

RelationalGroupedDataFrame.median

RelationalGroupedDataFrame.min

RelationalGroupedDataFrame.sum

테이블

Table.delete

Table.drop_table

Table.merge

Table.sample

Table.update

WhenMatchedClause.delete

WhenMatchedClause.update

WhenNotMatchedClause.insert