로컬 테스트 프레임워크¶
이 항목에서는 Snowpark 라이브러리로 작업할 때 로컬에서 코드를 테스트하는 방법을 설명합니다.
이 항목의 내용:
Snowpark Python 로컬 테스트 프레임워크를 사용하면 Snowflake 계정에 연결하지 않고도 Snowpark Python DataFrames에서 로컬로 생성하고 작동할 수 있습니다. 코드 변경 사항을 계정에 배포하기 전에 로컬 테스트 프레임워크를 사용하여 개발 시스템 또는 CI(지속적 통합) 파이프라인에서 로컬로 DataFrame 작업을 테스트할 수 있습니다. API는 동일하므로 코드를 변경하지 않고도 로컬에서 테스트를 실행하거나 Snowflake 계정에 대한 테스트를 실행할 수 있습니다.
전제 조건¶
로컬 테스트 프레임워크를 사용하려면 다음을 수행하십시오.
선택적 종속성
pandas
와 함께 Snowpark Python 라이브러리 버전 1.11.1 이상을 사용해야 합니다.pip install "snowflake-snowpark-python[pandas]"
를 실행하여 설치하기지원되는 Python 버전은 다음과 같습니다.
3.8
3.9
3.10
3.11
세션 만들기 및 로컬 테스트 활성화하기¶
시작하려면 Snowpark Session
을 만들고 로컬 테스트 구성을 True
로 설정하십시오.
from snowflake.snowpark import Session
session = Session.builder.config('local_testing', True).create()
세션이 생성되면 세션을 사용하여 DataFrames에서 생성하고 작업할 수 있습니다.
df = session.create_dataframe([[1,2],[3,4]],['a','b'])
df.with_column('c', df['a']+df['b']).show()
데이터 로드하기¶
Python primitive, 파일, Pandas DataFrames에서 Snowpark DataFrames를 생성할 수 있습니다. 이는 테스트 사례의 입력 및 예상 출력을 지정하는 데 유용합니다. 이런 식으로 그 작업을 수행하면 데이터가 원본 제어 상태가 되므로, 테스트 데이터와 테스트 사례의 동기화를 더 쉽게 유지할 수 있습니다.
CSV 데이터 로드하기¶
먼저 Session.file.put()
을 호출하여 파일을 인메모리 스테이지에 로드한 다음 Session.read()
를 사용하여 그 내용을 읽는 방식으로 CSV 파일을 Snowpark DataFrame에 로드할 수 있습니다. data.csv
파일이 있고 파일 내용이 다음과 같다고 가정합니다.
col1,col2,col3,col4
1,a,true,1.23
2,b,false,4.56
다음 코드를 사용하여 data.csv
를 Snowpark DataFrame에 로드할 수 있습니다. 먼저 파일을 스테이지에 배치해야 합니다. 그렇지 않으면 파일을 찾을 수 없다는 오류가 발생합니다.
from snowflake.snowpark.types import StructType, StructField, IntegerType, BooleanType, StringType, DoubleType
# Put file onto stage
session.file.put("data.csv", "@mystage", auto_compress=False)
schema = StructType(
[
StructField("col1", IntegerType()),
StructField("col2", StringType()),
StructField("col3", BooleanType()),
StructField("col4", DoubleType()),
]
)
# with option SKIP_HEADER set to 1, the header will be skipped when the csv file is loaded
dataframe = session.read.schema(schema).option("SKIP_HEADER", 1).csv("@mystage/data.csv")
dataframe.show()
dataframe.show()
의 출력은 다음과 같습니다.
-------------------------------------
|"COL1" |"COL2" |"COL3" |"COL4" |
-------------------------------------
|1 |a |True |1.23 |
|2 |b |False |4.56 |
-------------------------------------
Pandas 데이터 로드하기¶
create_dataframe
메서드를 호출하고 데이터를 Pandas DataFrame로 전달하여 Pandas DataFrame에서 Snowpark Python DataFrame을 생성할 수 있습니다.
import pandas as pd
pandas_df = pd.DataFrame(
data={
"col1": pd.Series(["value1", "value2"]),
"col2": pd.Series([1.23, 4.56]),
"col3": pd.Series([123, 456]),
"col4": pd.Series([True, False]),
}
)
dataframe = session.create_dataframe(data=pandas_df)
dataframe.show()
dataframe.show()
는 다음을 출력합니다.
-------------------------------------
|"col1" |"col2" |"col3" |"col4" |
-------------------------------------
|value1 |1.23 |123 |True |
|value2 |4.56 |456 |False |
-------------------------------------
Snowpark Python DataFrame은 DataFrame에서 to_pandas
메서드를 호출하여 Pandas DataFrame로 변환할 수도 있습니다.
from snowflake.snowpark.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType
dataframe = session.create_dataframe(
data=[
["value1", 1.23, 123, True],
["value2", 4.56, 456, False],
],
schema=StructType([
StructField("col1", StringType()),
StructField("col2", DoubleType()),
StructField("col3", LongType()),
StructField("col4", BooleanType()),
])
)
pandas_dataframe = dataframe.to_pandas()
print(pandas_dataframe.to_string())
print(pandas_dataframe.to_string())
에 대한 호출은 다음을 출력합니다.
COL1 COL2 COL3 COL4
0 value1 1.23 123 True
1 value2 4.56 456 False
세션을 위한 PyTest 픽스쳐 만들기¶
PyTest 픽스쳐 는 테스트(또는 테스트 모듈) 전에 실행되는 함수로, 일반적으로 테스트에 데이터나 연결을 제공합니다. 이 경우 Snowpark Session
오브젝트를 반환하는 픽스쳐를 만듭니다. 먼저 test
디렉터리가 아직 없다면 하나 만드십시오. 그런 다음 test
디렉터리에서 다음 내용으로 conftest.py
파일을 만듭니다. 여기서 connection_parameters
는 Snowflake 계정 자격 증명이 포함된 사전입니다. 사전 형식에 대한 자세한 내용은 세션 만들기 섹션을 참조하십시오.
# test/conftest.py
import pytest
from snowflake.snowpark.session import Session
def pytest_addoption(parser):
parser.addoption("--snowflake-session", action="store", default="live")
@pytest.fixture(scope='module')
def session(request) -> Session:
if request.config.getoption('--snowflake-session') == 'local':
return Session.builder.config('local_testing', True).create()
else:
return Session.builder.configs(CONNECTION_PARAMETERS).create()
pytest_addoption
을 호출하면 snowflake-session
이라는 명령줄 옵션이 pytest
명령에 추가됩니다. Session
픽스쳐는 이 명령줄 옵션을 확인하고 해당 값에 따라 로컬 또는 라이브 Session
을 생성합니다. 이를 통해 테스트를 위해 로컬 모드와 라이브 모드 간에 쉽게 전환할 수 있습니다.
# Using local mode:
pytest --snowflake-session local
# Using live mode
pytest
SQL 작업¶
Session.sql(...)
은 로컬 테스트 프레임워크에서는 지원되지 않습니다. 가능하면 항상 Snowpark의 DataFrame API를 사용하고, Session.sql(...)
을 사용해야 하는 경우 Python의 unittest.mock.patch
를 사용하여 테이블 형식 반환 값을 모의하여 주어진 Session.sql()
호출에서 예상되는 응답을 패치할 수 있습니다.
아래 예제에서 mock_sql()
은 SQL 쿼리 텍스트를 원하는 DataFrame 응답에 매핑합니다. 다음 조건문은 현재 세션이 로컬 테스트를 사용 중인지 확인하고, 그렇다면 Session.sql()
메서드에 패치를 적용합니다.
from unittest import mock
from functools import partial
def test_something(pytestconfig, session):
def mock_sql(session, sql_string): # patch for SQL operations
if sql_string == "select 1,2,3":
return session.create_dataframe([[1,2,3]])
else:
raise RuntimeError(f"Unexpected query execution: {sql_string}")
if pytestconfig.getoption('--snowflake-session') == 'local':
with mock.patch.object(session, 'sql', wraps=partial(mock_sql, session)): # apply patch for SQL operations
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
else:
assert session.sql("select 1,2,3").collect() == [Row(1,2,3)]
로컬 테스트가 활성화되면 DataFrame.save_as_table()
에서 생성된 모든 테이블이 인메모리 상태로 임시 테이블로 저장되며 Session.table()
을 사용하여 검색할 수 있습니다. 평소와 같이 테이블에서 지원되는 DataFrame 작업을 사용할 수 있습니다.
기본 제공 함수 패치 적용¶
snowflake.snowpark.functions
아래의 기본 제공 함수가 모두 로컬 테스트 프레임워크에서 지원되는 것은 아닙니다. 지원되지 않는 함수를 사용하는 경우 snowflake.snowpark.mock
의 @patch
데코레이터를 사용하여 패치를 생성해야 합니다.
패치된 함수를 정의하고 구현하려면 시그니처(매개 변수 목록)가 기본 제공 함수의 매개 변수와 일치해야 합니다. 로컬 테스트 프레임워크는 다음 규칙을 사용하여 패치된 함수에 매개 변수를 전달합니다.
기본 제공 함수의 시그니처에 있는
ColumnOrName
유형 매개 변수의 경우ColumnEmulator
가 패치된 함수의 매개 변수로 전달됩니다.ColumnEmulator
는 열 데이터가 포함된pandas.Series
오브젝트와 유사합니다.기본 제공 함수의 시그니처에 있는
LiteralType
유형 매개 변수의 경우 리터럴 값이 패치된 함수의 매개 변수로 전달됩니다.그렇지 않으면 원시 값이 패치된 함수의 매개 변수로 전달됩니다.
패치된 함수의 반환 유형에 관해서는 기본 제공 함수의 Column
반환 유형에 맞춰 ColumnEmulator
인스턴스가 반환될 것으로 예상됩니다.
예를 들어 기본 제공 함수 to_timestamp()
는 다음과 같이 패치될 수 있습니다.
import datetime
from snowflake.snowpark.mock import patch, ColumnEmulator, ColumnType
from snowflake.snowpark.functions import to_timestamp
from snowflake.snowpark.types import TimestampType
@patch(to_timestamp)
def mock_to_timestamp(column: ColumnEmulator, format = None) -> ColumnEmulator:
ret_column = ColumnEmulator(data=[datetime.datetime.strptime(row, '%Y-%m-%dT%H:%M:%S%z') for row in column])
ret_column.sf_type = ColumnType(TimestampType(), True)
return ret_column
테스트 사례 건너뛰기¶
PyTest 테스트 모음에 로컬 테스트에서 제대로 지원되지 않는 테스트 사례가 포함된 경우 PyTest의 mark.skipif
데코레이터를 사용하여 해당 사례를 건너뛸 수 있습니다. 아래 예제에서는 앞서 설명한 대로 세션과 매개 변수를 구성했다고 가정합니다. 조건은 local_testing_mode
가 local
로 설정되어 있는지 확인하고, 그럴 경우 건너뛴 이유를 설명하는 메시지와 함께 테스트 사례를 건너뜁니다.
import pytest
@pytest.mark.skipif(
condition="config.getvalue('local_testing_mode') == 'local'",
reason="Test case disabled for local testing"
)
def test_case(session):
...
제한 사항¶
다음 기능은 지원되지 않습니다.
원시 SQL 문자열과 SQL 문자열 구문 분석이 필요한 작업. 예를 들어,
session.sql
및DataFrame.filter("col1 > 12")
는 지원되지 않습니다.UDF, UDTF, 저장 프로시저.
테이블 함수.
AsyncJobs.
웨어하우스, 스키마 및 기타 세션 속성 변경과 같은 세션 작업.
Geometry
및Geography
데이터 타입.윈도우 함수 집계.
# Selecting window function expressions is supported df.select("key", "value", sum_("value").over(), avg("value").over()) # Aggregating window function expressions is NOT supported df.group_by("key").agg([sum_("value"), sum_(sum_("value")).over(window) - sum_("value")])
기타 제한 사항은 다음과 같습니다.
Variant
,Array
및Object
데이터 타입은 표준 JSON 인코딩 및 디코딩에서만 지원됩니다. [1,2,,3,] 등의 식은 Snowflake에서는 유효한 JSON으로 간주되지만, Python의 기본 제공 JSON 기능이 사용되는 로컬 테스트에서는 그렇지 않습니다. 모듈 수준 변수snowflake.snowpark.mock.CUSTOM_JSON_ENCODER
및snowflake.snowpark.mock.CUSTOM_JSON_DECODER
를 지정하여 기본 설정을 재정의할 수 있습니다.Snowflake 함수(윈도우 함수 포함) 중 일부만 구현됩니다. 자신만의 함수 정의를 삽입하는 방법을 알아보려면 기본 제공 함수 패치 적용 섹션을 참조하십시오.
패치 순위 관련 함수는 현재 지원되지 않습니다.
동일한 이름의 열을 선택하면 하나의 열만 반환됩니다. 해결 방법으로,
Column.alias
를 사용하여 고유한 이름을 갖도록 열 이름을 바꾸십시오.df.select(lit(1), lit(1)).show() # col("a"), col("a") #--------- #|"'1'" | #--------- #|1 | #|... | #--------- # Workaround: Column.alias DataFrame.select(lit(1).alias("col1_1"), lit(1).alias("col1_2")) # "col1_1", "col1_2"
Column.cast
를 사용하는 명시적 유형 캐스팅에는 형식 문자열이 입력(to_decimal
,to_number
,to_numeric
,to_double
,to_date
,to_time
,to_timestamp
)과 출력(to_char
,to_varchar
,to_binary
)에 지원되지 않는다는 제한이 있습니다.VariantType
에 저장된 JSON 문자열은Datetime
유형으로 변환될 수 없습니다.Table.merge
및Table.update
의 경우 구현은 세션 매개 변수ERROR_ON_NONDETERMINISTIC_UPDATE
및ERROR_ON_NONDETERMINISTIC_MERGE
가False
로 설정된 경우에만 동작을 지원합니다. 이는 다중 조인의 경우 일치하는 행 중 하나를 업데이트한다는 의미입니다.
지원되는 API 목록¶
Snowpark 세션¶
Session.createDataFrame
Session.create_dataframe
Session.flatten
Session.range
Session.table
입력/출력¶
DataFrameReader.csv
DataFrameReader.table
DataFrameWriter.saveAsTable
DataFrameWriter.save_as_table
DataFrame¶
DataFrame.agg
DataFrame.cache_result
DataFrame.col
DataFrame.collect
DataFrame.collect_nowait
DataFrame.copy_into_table
DataFrame.count
DataFrame.createOrReplaceTempView
DataFrame.createOrReplaceView
DataFrame.create_or_replace_temp_view
DataFrame.create_or_replace_view
DataFrame.crossJoin
DataFrame.cross_join
DataFrame.distinct
DataFrame.drop
DataFrame.dropDuplicates
DataFrame.drop_duplicates
DataFrame.dropna
DataFrame.except_
DataFrame.explain
DataFrame.fillna
DataFrame.filter
DataFrame.first
DataFrame.groupBy
DataFrame.group_by
DataFrame.intersect
DataFrame.join
DataFrame.limit
DataFrame.minus
DataFrame.natural_join
DataFrame.orderBy
DataFrame.order_by
DataFrame.rename
DataFrame.replace
DataFrame.rollup
DataFrame.sample
DataFrame.select
DataFrame.show
DataFrame.sort
DataFrame.subtract
DataFrame.take
DataFrame.toDF
DataFrame.toLocalIterator
DataFrame.toPandas
DataFrame.to_df
DataFrame.to_local_iterator
DataFrame.to_pandas
DataFrame.to_pandas_batches
DataFrame.union
DataFrame.unionAll
DataFrame.unionAllByName
DataFrame.unionByName
DataFrame.union_all
DataFrame.union_all_by_name
DataFrame.union_by_name
DataFrame.unpivot
DataFrame.where
DataFrame.withColumn
DataFrame.withColumnRenamed
DataFrame.with_column
DataFrame.with_column_renamed
DataFrame.with_columns
DataFrameNaFunctions.drop
DataFrameNaFunctions.fill
DataFrameNaFunctions.replace
열¶
Column.alias
Column.as_
Column.asc
Column.asc_nulls_first
Column.asc_nulls_last
Column.astype
Column.between
Column.bitand
Column.bitor
Column.bitwiseAnd
Column.bitwiseOR
Column.bitwiseXOR
Column.bitxor
Column.cast
Column.collate
Column.desc
Column.desc_nulls_first
Column.desc_nulls_last
Column.endswith
Column.eqNullSafe
Column.equal_nan
Column.equal_null
Column.getItem
Column.getName
Column.get_name
Column.in_
Column.isNotNull
Column.isNull
Column.is_not_null
Column.is_null
Column.isin
Column.like
Column.name
Column.over
Column.regexp
Column.rlike
Column.startswith
Column.substr
Column.substring
Column.try_cast
Column.within_group
CaseExpr.when
CaseExpr.otherwise
데이터 타입¶
ArrayType
BinaryType
BooleanType
ByteType
ColumnIdentifier
DataType
DateType
DecimalType
DoubleType
FloatType
IntegerType
LongType
MapType
NullType
ShortType
StringType
StructField
StructType
Timestamp
TimestampType
TimeType
Variant
VariantType
행¶
Row.asDict
Row.as_dict
Row.count
Row.index
함수¶
abs
avg
coalesce
contains
count
count_distinct
covar_pop
endswith
first_value
iff
lag
last_value
lead
list_agg
max
median
min
parse_json
row_number
startswith
substring
sum
to_array
to_binary
to_boolean
to_char
to_date
to_decimal
to_double
to_object
to_time
to_timestamp
to_variant
Window¶
Window.orderBy
Window.order_by
Window.partitionBy
Window.partition_by
Window.rangeBetween
Window.range_between
Window.rowsBetween
Window.rows_between
WindowSpec.orderBy
WindowSpec.order_by
WindowSpec.partitionBy
WindowSpec.partition_by
WindowSpec.rangeBetween
WindowSpec.range_between
WindowSpec.rowsBetween
WindowSpec.rows_between
그룹화¶
RelationalGroupedDataFrame.agg
RelationalGroupedDataFrame.apply_in_pandas
RelationalGroupedDataFrame.applyInPandas
RelationalGroupedDataFrame.avg
RelationalGroupedDataFrame.builtin
RelationalGroupedDataFrame.count
RelationalGroupedDataFrame.function
RelationalGroupedDataFrame.max
RelationalGroupedDataFrame.mean
RelationalGroupedDataFrame.median
RelationalGroupedDataFrame.min
RelationalGroupedDataFrame.sum
테이블¶
Table.delete
Table.drop_table
Table.merge
Table.sample
Table.update
WhenMatchedClause.delete
WhenMatchedClause.update
WhenNotMatchedClause.insert