Snowpark Checkpoints 라이브러리¶
Snowpark Checkpoints는 Apache PySpark 에서 Snowpark Python으로 마이그레이션한 코드를 검증하는 테스트 라이브러리입니다.
전제 조건¶
Snowpark Checkpoints를 사용하려면 Python 개발 환경을 설정하십시오. 지원되는 Python 버전은 다음과 같습니다.
3.9
3.10
3.11
참고
Python 3.9는 Snowpark 클라이언트 버전 1.5.0에 종속됩니다. Python 3.10은 Snowpark 클라이언트 버전 1.5.1에 종속됩니다. Python 3.11은 Snowpark 클라이언트 버전 1.9.0에 종속됩니다.
Anaconda, Miniconda 또는 virtualenv 와 같은 도구를 사용하여 특정 Python 버전에 대한 Python 가상 환경을 만들 수 있습니다.
Snowpark Checkpoints 설치하기¶
conda 또는 pip 를 사용하여 Python 가상 환경에 Snowpark Checkpoints 패키지를 설치합니다.
Conda 사용:
conda install snowpark-checkpoints
Pip 사용:
pip install snowpark-checkpoints
원하는 경우 패키지를 개별적으로 설치할 수도 있습니다.
snowpark-checkpoints-collectors - 이 패키지를 사용하여 PySpark DataFrames 에 대한 정보를 수집합니다.
Conda 사용:
conda install snowpark-checkpoints-collectors
Pip 사용:
pip install snowpark-checkpoints-collectors
snowpark-checkpoints-hypothesis - 이 패키지를 사용하여 원본 PySpark 코드에서 수집한 DataFrame 스키마에 따라 자동으로 생성된 합성 데이터를 기반으로 Snowpark 코드에 대한 단위 테스트를 생성하십시오.
Conda 사용:
conda install snowpark-checkpoints-hypothesis
Pip 사용:
pip install snowpark-checkpoints-hypothesis
snowpark-checkpoints-validators - 이 패키지를 사용하여 수집된 스키마 또는 수집기 기능으로 생성된 내보낸 DataFrames 에 대해 변환된 Snowpark DataFrames 를 검증합니다.
Conda 사용:
conda install snowpark-checkpoints-validators
Pip 사용:
pip install snowpark-checkpoints-validators
snowpark-checkpoints-configuration - 이 패키지를 사용하면
snowpark-checkpoints-collectors
및snowpark-checkpoints-validators
가 검사점 구성을 자동으로 로드할 수 있습니다.Conda 사용:
conda install snowpark-checkpoints-configuration
Pip 사용:
pip install snowpark-checkpoints-configuration
프레임워크 사용하기¶
PySpark Code에 대한 정보 수집하기¶
snowpark-checkpoints-collectors
패키지는 PySpark DataFrames 에서 정보를 추출하는 함수를 제공합니다. 그런 다음 해당 데이터를 사용하여 변환된 Snowpark DataFrames 에 대해 검증하여 동작 동등성을 보장할 수 있습니다.
다음 함수를 사용하여 새 검사점 수집 지점을 삽입할 수 있습니다.
함수 서명:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
함수 매개 변수:
df: PySpark DataFrame.
checkpoint_name: 검사점의 이름입니다. 문자(A-Z, a-z) 또는 밑줄(_)로 시작하고 문자, 밑줄, 소수 자릿수(0-9)만 포함합니다.
sample: (선택 사항) 샘플 크기입니다. 기본값은 0에서 1.0 사이의 범위에서 1.0(전체 PySpark DataFrame)입니다.
mode: (선택 사항) 실행 모드입니다. 옵션은
SCHEMA
및DATAFRAME
입니다. 기본값은SCHEMA
입니다.output_path: (선택 사항) 검사점을 저장할 출력 경로입니다. 기본값은 현재 작업 디렉터리입니다.
수집 프로세스는 각 수집 지점에 대한 결과 정보가 포함된 checkpoint_collection_result.json
이라는 출력 파일을 생성합니다. JSON 파일이며 다음 정보가 포함되어 있습니다.
수집 지점이 시작된 타임스탬프입니다.
수집 지점이 있는 파일의 상대 경로입니다.
수집 지점이 있는 파일의 코드 줄입니다.
수집 지점 검사점의 이름입니다.
수집 지점 결과(불합격 또는 합격)입니다.
스키마 추론 수집 데이터 모드(스키마)¶
이 모드는 기본 모드로, Pandera 스키마 추론을 활용하여 지정된 DataFrame 에 대해 평가할 메타데이터와 검사를 가져옵니다. 이 모드는 PySpark 유형에 따라 DataFrame 열에서 사용자 지정 데이터를 수집합니다.
열 데이터 및 확인은 열의 PySpark 타입에 따라 수집됩니다(아래 테이블 참조). 열의 유형에 관계없이 모든 열에 대해 수집되는 사용자 지정 데이터에는 열 이름, 열 유형, null 가능 여부, 행 수, null이 아닌 행 수, null 행 수가 포함됩니다.
열 유형 |
수집된 사용자 지정 데이터 |
---|---|
Numeric( |
최소값. 최대값. 평균값. 소수점 전체 자릿수(정수형인 경우 값은 0임). 표준 편차. |
날짜 |
최소값. 최대값. 날짜 형식: %Y-%m-%d |
DayTimeIntervalType 및 YearMonthIntervalType |
최소값. 최대값. |
타임스탬프 |
최소값. 최대값. 날짜 형식: %Y-%m-%dH:%M:%S |
Timestamp ntz |
최소값. 최대값. 날짜 형식: %Y-%m-%dT%H:%M:%S%z |
String |
최소 길이 값. 최대 길이 값. |
Char |
PySpark 는 리터럴을 문자열 유형으로 처리하므로 char 는 유효한 유형이 아닙니다. |
Varchar |
PySpark 는 리터럴을 문자열 유형으로 처리하므로 Varchar 는 유효한 유형이 아닙니다. |
Decimal |
최소값. 최대값. 평균값. 소수점 이하 전체 자릿수. |
배열 |
값의 유형. 허용되는 경우 null을 요소로 사용합니다. Null 값의 비율. 최대 배열 크기. 최소 배열 크기. 배열의 평균 크기. 모든 배열의 크기가 같은 경우. |
바이너리 |
최대 크기. 최소 크기. 평균 크기. 모든 요소의 크기가 같은 경우. |
Map |
키의 유형. 값의 유형. 허용되는 경우 값으로 null을 사용합니다. Null 값의 비율. 최대 맵 크기. 최소 매핑 크기. 평균 맵 크기. 모든 맵의 크기가 같은 경우. |
Null |
NullType 은 데이터 타입을 확인할 수 없으므로 이 타입에서 정보를 가져올 수 없으므로 None을 나타냅니다. |
Struct |
구조체의 메타데이터는 각 structField에 대한 것입니다. |
또한 다음 표에 자세히 설명된 각 데이터 타입에 대해 미리 정의된 검증 세트를 정의합니다.
타입 |
Pandera 검사 항목 |
추가 검사 항목 |
---|---|---|
부울 |
각 값은 True 또는 False입니다. |
True 값과 False 값의 개수입니다. |
Numeric( |
각 값은 최소값과 최대값의 범위 내에 있습니다. |
소수점 이하 전체 자릿수. 평균값. 표준 편차. |
날짜 |
N/A |
최소값 및 최대값 |
타임스탬프 |
각 값은 최소값과 최대값의 범위 내에 있습니다. |
값의 형식. |
Timestamp ntz |
각 값은 최소값과 최대값의 범위 내에 있습니다. |
값의 형식. |
String |
각 값의 길이는 최소 및 최대 길이 범위 내에 있습니다. |
없음 |
Char |
PySpark 는 리터럴을 문자열 유형으로 처리하므로 |
|
Varchar |
PySpark 는 리터럴을 문자열 유형으로 처리하므로 |
|
Decimal |
N/A |
N/A |
배열 |
N/A |
없음 |
바이너리 |
N/A |
없음 |
Map |
N/A |
없음 |
Null |
N/A |
N/A |
Struct |
N/A |
없음 |
이 모드를 사용하면 사용자가 수집할 DataFrame 샘플을 정의할 수 있지만 선택 사항입니다. 기본적으로 컬렉션은 전체 DataFrame 에서 작동합니다. 샘플의 크기는 모집단을 통계적으로 대표할 수 있어야 합니다.
Pandera는 Pandas DataFrame 의 스키마만 유추할 수 있으므로 PySpark DataFrame 을 Pandas DataFrame 으로 변환해야 하며, 이는 열의 유형 확인에 영향을 줄 수 있습니다. 특히 Pandera는 string
, array
, map
, null
, struct
및 binary
PySpark 유형을 오브젝트 유형으로 유추합니다.
이 모드의 출력은 수집된 각 DataFrame 파일에 대해 JSON 파일이며, 파일 이름은 검사점과 동일합니다. 이 파일에는 스키마와 관련된 정보가 포함되어 있으며 두 개의 섹션으로 구성되어 있습니다.
Pandera 스키마 섹션에는 이름, 타입(Pandera), 열의 null 값 허용 여부, 기타 열별 정보 등 Pandera가 추론한 데이터와 PySpark 타입에 따른 열의 검사 항목이 포함되어 있습니다. Pandera의
DataFrameSchema
오브젝트입니다.사용자 지정 데이터 섹션은 PySpark 타입에 따라 각 열에서 수집한 사용자 지정 데이터의 배열입니다.
참고
컬렉션 패키지는 큰 PySpark DataFrames 를 처리할 때 메모리 문제가 있을 수 있습니다. 이 문제를 해결하기 위해 컬렉션 함수의 샘플 매개 변수를 0.0에서 1.0 사이의 값으로 설정하여 전체 PySpark DataFrame 대신 데이터의 하위 세트로 작업할 수 있습니다.
DataFrame 수집 데이터 모드(DataFrame)¶
이 모드는 PySpark DataFrame 의 데이터를 수집합니다. 이 경우 메커니즘은 주어진 DataFrame 의 모든 데이터를 Parquet 형식으로 저장합니다. 기본 사용자 Snowflake 연결을 사용하여 Parquet 파일을 Snowflake 임시 스테이지에 업로드하고 스테이지의 정보를 기반으로 테이블을 생성하려고 시도합니다. 파일과 테이블의 이름은 검사점과 동일합니다.
이 모드의 출력은 DataFrame 에 저장된 Parquet 파일 결과와 기본 Snowflake 구성 연결의 DataFrame 데이터가 포함된 테이블입니다.
Snowpark 변환 코드 검증하기¶
Snowpark Checkpoints 패키지는 PySpark 코드에 대한 동작 동등성을 보장하기 위해 Snowpark 코드에 적용할 수 있는 검증 세트를 제공합니다.
프레임워크에서 제공하는 함수¶
check_with_spark: 모든 Snowpark DataFrame 인자를 함수나 샘플로 변환한 다음 PySpark DataFrames 로 변환하는 데코레이터입니다. 그런 다음 새로운 Snowpark 함수의 기능을 미러링하는 제공된 spark 함수를 실행하고 두 구현 간의 출력을 비교합니다. Spark 함수와 Snowpark 함수가 의미적으로 동일하다고 가정하면 실제 샘플 데이터에서 해당 함수를 확인할 수 있습니다.
- 매개 변수:
job_context
(SnowparkJobContext): 검증을 위한 구성 및 세부 정보가 포함된 작업 컨텍스트입니다.spark_function
(fn): Snowpark 구현과 비교할 수 있는 PySpark 함수입니다.checkpoint_name
(str): 검사점의 이름입니다. 기본값은 None입니다.sample_number
(Optional[int], 선택 사항): 검증을 위한 행 수입니다. 기본값은 100입니다.sampling_strategy
(Optional[SamplingStrategy], 선택 사항): 데이터 샘플링에 사용되는 전략입니다. 기본값은SamplingStrategy.RANDOM_SAMPLE
입니다.output_path
(Optional[str], 선택 사항): 검증 결과를 저장할 경로입니다. 기본값은 None입니다.
다음은 그 예입니다.
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: 이 함수는 인자 모드에 따라 특정 검사점 스키마 파일 또는 가져온 데이터프레임에 대해 Snowpark Dataframe을 검증합니다. 이는 해당 DataFrame 에 대해 수집된 정보와 함수에 전달되는 DataFrame 이 동일한지 확인합니다.
- 매개 변수:
df
(SnowparkDataFrame): 검증할 DataFrame 입니다.checkpoint_name
(str): 검증할 검사점의 이름입니다.job_context
(SnowparkJobContext, 선택 사항) (str): 검증을 위한 작업 컨텍스트입니다. PARQUET 모드에 필요합니다.mode
(CheckpointMode): 검증 모드(예: SCHEMA, PARQUET)입니다. 기본값은 SCHEMA 입니다.custom_checks
(Optional[dict[Any, Any]], 선택 사항): 검증 중에 적용할 사용자 지정 검사 항목입니다.skip_checks
(Optional[dict[Any, Any]], 선택 사항): 검증 중 건너뛸 검사 항목입니다.sample_frac
(Optional[float], 선택 사항): 검증을 위해 샘플링할 DataFrame 의 일부입니다. 기본값은 0.1입니다.sample_number
(Optional[int], 선택 사항): 검증을 위해 샘플링할 행 수입니다.sampling_strategy
(Optional[SamplingStrategy], 선택 사항): 샘플링에 사용할 전략입니다.output_path
(Optional[str], 선택 사항): 검증 결과의 출력 경로입니다.
다음은 그 예입니다.
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
선택한 모드에 따라 검증은 수집된 스키마 파일 또는 Parquet에 로딩된 데이터프레임(Snowflake)을 사용하여 PySpark 버전과 동등성을 확인합니다.
check-ouput_schema: 이 데코레이터는 Snowpark 함수 출력의 스키마를 검증하고 출력 DataFrame 이 지정된 Pandera 스키마를 준수하는지 확인합니다. 특히 Snowpark 파이프라인에서 데이터 통합과 일관성을 강화하는 데 유용합니다. 이 데코레이터는 검증할 Pandera 스키마, 검사점 이름, 샘플 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. 이 함수는 Snowpark 함수를 래핑하고 출력 DataFrame 에 대해 스키마 검증을 수행한 후 결과를 반환합니다.
다음은 그 예입니다.
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: 이 데코레이터는 Snowpark 함수의 입력 인자에 대한 스키마를 검증합니다. 이 데코레이터는 함수가 실행되기 전에 입력 DataFrame 이 지정된 Pandera 스키마를 준수하는지 확인합니다. 특히 Snowpark 파이프라인에서 데이터 통합과 일관성을 강화하는 데 유용합니다. 이 데코레이터는 검증할 Pandera 스키마, 검사점 이름, 샘플 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. 함수를 실행하기 전에 Snowpark 함수를 래핑하고 입력 DataFrame 에 대해 스키마 검증을 수행합니다.
다음은 그 예입니다.
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
통계 검사 항목¶
Schema
모드에서 검증을 실행하면 기본적으로 특정 열 유형에 통계 검증이 적용되며, skip_checks
를 사용하면 이러한 검사를 건너뛸 수 있습니다.
열 유형 |
기본 검사 |
---|---|
Numeric: |
between: 값이 최소값과 최대값 사이에 있는 경우 최소값과 최대값을 포함합니다. decimal_precision: 값이 소수점인 경우 소수점 이하 전체 자릿수를 확인합니다. mean: 열의 평균이 특정 범위 내에 있는지 확인합니다. |
부울 |
isin: 값이 True인지 False인지 확인합니다. True_proportion: True 값의 비율이 특정 범위 내에 있는지 확인합니다. False_proportion: False 값의 비율이 특정 범위 내에 속하는지 확인합니다. |
Date: |
between: 값이 최소값과 최대값 사이에 있는 경우 최소값과 최대값을 포함합니다. |
Nullable: 지원되는 모든 유형 |
Null_proportion: Null 비율을 적절히 검증합니다. |
검사 건너뛰기¶
열 검증 또는 열에 대한 특정 검사를 건너뛸 수 있는 세분화된 검사 제어 기능이 있습니다. 매개 변수 skip_checks
를 사용하여 특정 열과 건너뛸 검증 유형을 지정할 수 있습니다. 건너뛰는 데 사용되는 검사 이름은 검사와 연결된 이름입니다.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
사용자 지정 검사 항목¶
custom_checks
속성을 사용하여 JSON 파일에서 생성된 스키마에 추가 검사를 추가할 수 있습니다. 이렇게 하면 Pandera 스키마에 검사가 추가됩니다.
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
샘플링 전략¶
제공된 코드의 샘플링 프로세스는 데이터의 대표 샘플을 가져와서 대규모 DataFrames 를 효율적으로 검증하도록 설계되었습니다. 이 접근 방식은 계산 비용과 시간이 많이 소요될 수 있는 전체 데이터 세트를 처리할 필요 없이 스키마 검증을 수행하는 데 도움이 됩니다.
- 매개 변수:
sample_frac
: 이 매개 변수는 샘플링할 DataFrame 의 비율을 지정합니다. 예를 들어sample_frac
을 0.1로 설정하면 DataFrame 행의 10%가 샘플링됩니다. 이는 컴퓨팅 리소스를 절약하기 위해 데이터의 하위 세트를 검증하려는 경우에 유용합니다.sample_number
: 이 매개 변수는 DataFrame 에서 샘플링할 정확한 행 수를 지정합니다. 예를 들어sample_number
가 100으로 설정된 경우 DataFrame 에서 100개의 행이 샘플링됩니다. DataFrame 크기에 관계없이 수정된 행 수를 검증하려는 경우에 유용합니다.
검증 결과¶
모든 유형의 검증이 실행되면 합격 또는 불합격 여부에 관계없이 결과가 checkpoint_validation_results.json
에 저장됩니다. 이 파일은 주로 VSCode 확장 프로그램의 기능에 사용됩니다. 여기에는 검증 상태, 타임스탬프, 검사점 이름, 함수 실행이 발생한 줄 번호, 파일에 대한 정보가 포함됩니다.
또한 검증 결과에 대한 정보가 포함된 SNOWPARK_CHECKPOINTS_REPORT 라는 테이블에 기본 Snowflake 계정에 결과를 기록합니다.
DATE
: 검증의 타임스탬프를 실행합니다.JOB
: SnowparkJobContext 의 이름입니다.STATUS
: 검증 상태입니다.CHECKPOINT
: 검증된 검사점의 이름입니다.MESSAGE
: 오류 메시지입니다.DATA
: 검증 실행의 데이터입니다.EXECUTION_MODE
: 검증 모드가 실행되었습니다.
검사점 환경 변수¶
checkpoints.json
파일을 찾기 위한 프레임워크의 기본 동작은 SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR
라는 환경 변수를 찾는 것입니다. 이 변수에는 checkpoint.json
의 상대 경로가 포함됩니다. 코드의 코드 렌즈로 검사점을 실행할 때 VSCode 확장 프로그램에 의해 할당됩니다. 환경 변수가 할당되지 않은 경우 프레임워크는 현재 작업 디렉터리에서 파일을 찾으려고 시도합니다.
Hypothesis 단위 테스트¶
Hypothesis는 다양한 입력 데이터를 자동으로 생성하여 기존 단위 테스트를 개선하도록 설계된 강력한 Python용 테스트 라이브러리입니다. 개별 테스트 케이스를 지정하는 대신 속성 또는 조건으로 코드의 예상 동작을 설명하면 Hypothesis가 예를 생성하여 해당 속성을 철저하게 테스트하는 속성 기반 테스트를 사용합니다. 이 접근 방식은 에지 케이스와 예기치 않은 동작을 발견하는 데 도움이 되므로 복잡한 함수에 특히 효과적입니다. 자세한 내용은 Hypothesis 섹션을 참조하십시오.
snowpark-checkpoints-hypothesis
패키지는 Hypothesis 라이브러리를 확장하여 테스트 목적으로 합성 Snowpark DataFrames 를 생성합니다. 다양하고 무작위화된 테스트 데이터를 생성하는 Hypothesis의 기능을 활용하여 다양한 스키마와 값을 사용하여 실제 시나리오를 시뮬레이션하고 에지 케이스를 발견하여 강력한 코드를 보장하고 복잡한 변환의 정확성을 검증할 수 있는 Snowpark DataFrames 를 만들 수 있습니다.
Snowpark의 Hypothesis 전략은 합성 데이터 생성을 위해 Pandera에 의존합니다. dataframe_strategy
함수는 지정된 스키마를 사용하여 이를 준수하는 Pandas DataFrame 을 생성한 다음 이를 Snowpark DataFrame 으로 변환합니다.
함수 서명:
def dataframe_strategy(
schema: Union[str, DataFrameSchema],
session: Session,
size: Optional[int] = None
) -> SearchStrategy[DataFrame]
함수 매개 변수:
schema
: 열, 데이터 타입을 정의하고 생성된 Snowpark 데이터 프레임이 일치해야 하는지를 검사하는 스키마입니다. 스키마는 다음이 될 수 있습니다.snowpark-checkpoints-collectors
패키지의collect_dataframe_checkpoint
함수에 의해 생성된 JSON 스키마 파일의 경로.
session
: Snowpark DataFrames 를 생성하는 데 사용되는 snowflake.snowpark.Session 의 인스턴스입니다.size
: 각 Snowpark DataFrame 에 대해 생성할 행 수입니다. 이 매개 변수를 제공하지 않으면 전략은 크기가 다른 DataFrames 를 생성합니다.
함수 출력:
Snowpark DataFrames 를 생성하는 Hypothesis SearchStrategy 를 반환합니다.
지원 및 지원되지 않는 데이터 타입¶
dataframe_strategy
함수는 다양한 데이터 타입의 Snowpark DataFrames 생성을 지원합니다. 함수에 전달된 스키마 인자의 유형에 따라 전략에서 지원하는 데이터 타입이 달라집니다. 전략에서 지원되지 않는 데이터 타입을 발견하면 예외가 발생한다는 점에 유의하십시오.
다음 표는 JSON 파일을 schema
인자로 전달할 때 dataframe_strategy
함수에 의해 지원되는 PySpark 데이터 타입과 지원되지 않는 타입을 보여줍니다.
PySpark 데이터 타입 |
지원됨 |
---|---|
예 |
|
예 |
|
아니요 |
|
예 |
|
아니요 |
|
아니요 |
|
아니요 |
|
아니요 |
|
예 |
|
예 |
|
아니요 |
|
예 |
|
예 |
|
아니요 |
|
아니요 |
다음 표는 DataFrameSchema 오브젝트를 schema
인자로 전달할 때 dataframe_strategy
함수가 지원하는 Pandera 데이터 타입과 이들이 매핑되는 Snowpark 데이터 타입을 보여줍니다.
Pandera 데이터 타입 |
Snowpark 데이터 타입 |
---|---|
int8 |
|
int16 |
|
int32 |
|
int64 |
|
float32 |
|
float64 |
|
문자열 |
|
bool |
|
datetime64[ns, tz] |
|
datetime64[ns] |
|
날짜 |
예¶
Hypothesis 라이브러리를 사용하여 Snowpark DataFrames 를 생성하는 일반적인 워크플로는 다음과 같습니다.
코드가 모든 입력에 대해 충족해야 하는 다양한 어설션 또는 조건이 포함된 표준 Python 테스트 함수를 만듭니다.
테스트 함수에 Hypothesis
@given
데코레이터를 추가하고dataframe_strategy
함수를 인자로 전달합니다.@given
데코레이터에 대한 자세한 내용은 hypothesis.given 섹션을 참조하십시오.테스트 함수를 실행합니다. 테스트가 실행되면 Hypothesis는 생성된 입력을 자동으로 테스트의 인자로 제공합니다.
예 1: JSON 파일에서 Snowpark DataFrames 생성하기
다음은 snowpark-checkpoints-collectors
패키지의 collect_dataframe_checkpoint
함수에 의해 생성된 JSON 스키마 파일에서 Snowpark DataFrames 를 생성하는 예입니다.
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_json_file(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
예 2: Pandera DataFrameSchema 오브젝트에서 Snowpark DataFrame 생성하기
아래는 Pandera DataFrameSchema 인스턴스에서 Snowpark DataFrames 를 생성하는 방법의 예시입니다. 자세한 내용은 Pandera DataFrameSchema 섹션을 참조하십시오.
import pandera as pa
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema=pa.DataFrameSchema(
{
"boolean_column": pa.Column(bool),
"integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
"float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
}
),
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
예 3: Hypothesis 동작 사용자 지정하기
Hypothesis @settings
데코레이터를 사용하여 테스트의 동작을 사용자 지정할 수도 있습니다. 이 데코레이터를 사용하면 다양한 구성 변수를 사용자 지정하여 테스트 동작을 필요에 맞게 조정할 수 있습니다. @settings
데코레이터를 사용하면 최대 테스트 케이스 수, 각 테스트 실행 기한, 상세도 수준 등 다양한 측면을 제어할 수 있습니다. 자세한 내용은 Hypothesis 설정 섹션을 참조하십시오.
from datetime import timedelta
from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session
from snowflake.hypothesis_snowpark import dataframe_strategy
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
)
)
@settings(
deadline=timedelta(milliseconds=800),
max_examples=25,
)
def test_my_function(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Snowpark Checkpoints용 IDE 설정하기¶
Snowflake Extension for Visual Studio Code 는 프레임워크 사용 환경을 개선하기 위해 Snowpark Checkpoints 라이브러리에 대한 지원을 제공합니다. 코드에 삽입된 collect
및 validate
문을 세밀하게 제어할 수 있을 뿐만 아니라 변환된 코드의 동작 동등성 어설션의 상태를 검토할 수 있습니다.
Snowpark Checkpoints 활성화하기¶
Snowpark Checkpoints를 활성화하려면 Snowflake의 확장 프로그램 설정으로 이동하여 Snowpark Checkpoints: Enabled 를 확인하십시오.

뷰¶
앞서 설명한 대로 Snowpark Checkpoints 속성을 Enabled 로 설정하면 확장 프로그램에서 SNOWPARK CHECKPOINTS 라는 새 탭이 열립니다. 작업 공간의 모든 검사점을 표시하고 전체 또는 개별적으로 활성화/비활성화, 파일에서 모두 지우기 등 여러 작업을 수행할 수 있으며, 각 검사점을 더블 클릭하면 해당 검사점이 정의된 파일과 코드 줄로 이동합니다.
모든 검사점 전환하기¶
Snowpark Checkpoints 탭의 오른쪽 상단에 위치한 이 옵션은 모든 검사점에서 활성화된 속성을 전환합니다.

활성화된 검사점:

검사점을 비활성화하면 런타임에 건너뛰게 됩니다.

모든 검사점 정리하기¶
Snowpark Checkpoints 탭의 오른쪽 상단에 위치합니다. 이렇게 하면 작업 영역의 Jupyter 노트북을 포함한 모든 Python 파일에서 검사점이 제거되지만, 계약과 패널에서는 삭제되지 않습니다. 즉, Snowflake: Restore All Checkpoints
명령을 사용하여 복원할 수 있습니다.

파일에 검사점 삽입하기¶
파일 내부를 마우스 오른쪽 버튼으로 클릭하면 Snowpark Checkpoints 옵션이 포함된 컨텍스트 메뉴가 표시되어 Collection 및 Validation 검사점을 추가할 수 있습니다.
컨텍스트 메뉴의 Snowpark Checkpoints 옵션:

수집기/검증기를 추가했습니다.

단일 검사점 실행하기¶
각 검사점 위에 표시된 코드 렌즈 옵션을 클릭하여 단일 검사점을 실행할 수 있습니다. 실행하면 진행 상황을 보여주는 출력 콘솔이 나타나고 완료되면 결과 뷰가 나타납니다. 컨트랙트 파일에서 검사점이 비활성화되어 있어도 실행 시에는 활성화됩니다.

컨트랙트 파일에 진입점이 선언되지 않은 경우 오류 메시지 Entry point not found for the checkpoint. 가 표시됩니다.

파일에서 활성화된 모든 Snowpark Checkpoints 실행하기¶
각 파일의 오른쪽 상단에 Run all checkpoints from the current file 버튼이 표시됩니다.

클릭하면 실행 진행 상황을 표시하는 출력 채널이 나타납니다.

타임라인 뷰¶
검사점 실행 결과의 타임라인을 표시합니다.

명령¶
다음 명령은 Snowpark Checkpoints에 사용할 수 있습니다. 이를 사용하려면 명령 팔레트에 Snowflake: [command name]
을 입력합니다.
명령 |
설명 |
---|---|
Snowflake: 검사점 전환 |
모든 검사점의 활성화 속성을 전환합니다. |
Snowflake: Snowpark Checkpoints 프로젝트 초기화 |
프로젝트 초기화를 트리거하여 컨트랙트 파일이 없는 경우 파일을 생성합니다. 검사점이 있는 경우, 컨트랙트 파일에 검사점을 로딩할지 묻는 팝업이 표시됩니다. |
Snowflake: 모든 검사점 지우기 |
작업 공간의 모든 파일에서 모든 검사점을 삭제합니다. |
Snowflake: 모든 검사점 복원 |
컨트랙트 파일에 여전히 존재하는 파일에서 이전에 삭제한 검사점을 복원합니다. |
Snowflake: 검증/수집 검사점 추가 |
커서 위치에 필수 매개 변수가 있는 검증기 또는 수집기를 추가합니다. |
Snowflake: Snowpark Checkpoints 뷰에 집중하기 |
패널 SNOWPARK CHECKPOINTS 로 초점을 이동합니다. |
Snowflake: 검사점 타임라인 열기 |
검사점 실행 타임라인을 표시합니다. |
Snowflake: 현재 파일에서 모든 검사점 실행 |
현재 파일에서 활성화된 모든 검사점을 실행합니다. |
Snowflake: 작업 공간의 모든 검사점 실행 |
작업 영역에서 활성화된 모든 검사점을 실행합니다. |
Snowflake: Snowpark Checkpoints 결과 모두 보기 |
모든 검사점 결과가 포함된 탭을 표시합니다. |
경고¶
중복: 수집 프로젝트에서 두 개의 검사점이 동일한 이름으로 할당된 경우, “Another checkpoint with an identical name has been detected and will be overwritten.” 라는 경고가 표시됩니다. 검증 프로젝트에는 동일한 이름을 공유하는 여러 개의 검사점이 있을 수 있으며, 이 경우 경고가 표시되지 않습니다.
잘못된 유형: 프로젝트 유형과 다른 유형의 검사점을 추가하면 다음과 같은 오류 메시지와 함께 밑줄이 그어집니다. “Please make sure you are using the correct Snowpark-Checkpoints statement. This particular checkpoint statement is different from the others used in this project, statements that don’t match the project type will be ignored when executed.”
유효하지 않은 검사점 이름: 검사점 이름 매개 변수를 추가하는 방법이 잘못되었습니다. 이 경우 다음 경고 메시지가 표시됩니다. “Invalid checkpoint name. Checkpoint names must start with a letter and can only contain letters, numbers, hyphens, and underscores”.