Snowpark Checkpoints 라이브러리: 유효성 검사기¶
Snowpark Checkpoints 패키지는 PySpark 코드에 대한 동작 동등성을 보장하기 위해 Snowpark 코드에 적용할 수 있는 검증 세트를 제공합니다.
프레임워크에서 제공하는 함수¶
check_with_spark: 모든 Snowpark DataFrame 인자를 함수 또는 샘플로 변환한 후 PySpark DataFrames로 변환하는 데코레이터입니다. 이 검사는 제공된 spark 함수를 실행하여 새 Snowpark 함수의 기능을 미러링하고 두 구현의 출력을 비교합니다. spark 함수와 Snowpark 함수가 의미적으로 동일하다고 가정하면, 이 데코레이터는 실제 샘플링된 실제 데이터에서 해당 함수를 확인합니다.
- 매개 변수:
:code:`job_context`(SnowparkJobContext): 유효성을 검사하기 위한 구성과 세부 정보가 포함된 작업 컨텍스트입니다.
:code:`spark_function`(fn): Snowpark 구현과 비교하기 위한 동등한 PySpark 함수입니다.
:code:`checkpoint_name`(str): 검사점의 이름이며, 기본값은 None입니다.
:code:`sample_number`(Optional[int], 선택 사항): 유효성을 검사할 행의 수이며, 기본값은 100입니다.
:code:`sampling_strategy`(Optional[SamplingStrategy], 선택 사항): 데이터 샘플링에 사용되는 전략입니다. 기본값은 :code:`SamplingStrategy.RANDOM_SAMPLE`로 설정됩니다.
:code:`output_path`(Optional[str], 선택 사항): 유효성 검사 결과가 저장되는 파일의 경로이며, 기본값은 None입니다.
예:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: 이 함수는 인자 모드에 따라 특정 검사점 스키마 파일 또는 가져온 Dataframe에 대해 Snowpark Dataframe의 유효성을 검사합니다. 여기에서는 해당 DataFrame에서 수집된 정보와 함수에 전달된 DataFrame이 동일한지 확인합니다.
- 매개 변수:
:code:`df`(SnowparkDataFrame): 유효성을 검사할 DataFrame입니다.
:code:`checkpoint_name`(str): 유효성을 검사할 검사점의 이름입니다.
:code:`job_context`(SnowparkJobContext, 선택 사항)(str): 유효성을 검사하기 위한 작업 컨텍스트이며, PARQUET 모드에 필요합니다.
:code:`mode`(CheckpointMode): 유효성 검사 모드(예: SCHEMA, PARQUET)이며, 기본값은 SCHEMA로 설정됩니다
:code:`custom_checks`(Optional[dict[Any, Any]], 선택 사항): 유효성 검사 중에 적용할 사용자 지정 검사 항목입니다.
:code:`skip_checks`(Optional[dict[Any, Any]], 선택 사항): 유효성 검사 중에 건너뛸 검사 항목입니다.
:code:`sample_frac`(Optional[float], 선택 사항): 유효성을 검사하기 위해 샘플링할 DataFrame의 일부입니다. 기본값은 0.1로 설정됩니다.
:code:`sample_number`(Optional[int], 선택 사항): 유효성을 검사하기 위해 샘플링할 행의 수입니다.
:code:`sampling_strategy`(Optional[SamplingStrategy], 선택 사항): 샘플링에 사용할 전략입니다.
:code:`output_path`(Optional[str], 선택 사항): 유효성 검사 결과가 저장되는 파일의 경로입니다.
예:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
선택한 모드에 따라 유효성 검사는 수집된 스키마 파일 또는 Parquet에 로딩된 데이터프레임(Snowflake)을 사용하여 PySpark 버전과의 동등성을 확인합니다.
check-output_schema: 이 데코레이터는 Snowpark 함수 출력의 스키마 유효성을 검사하고, 출력 DataFrame이 지정된 Pandera 스키마를 따르는지 확인합니다. 이는 특히 Snowpark 파이프라인에서 데이터 무결성과 일관성을 적용하는 데 유용합니다. 이 데코레이터는 유효성을 검사할 Pandera 스키마, 검사점 이름, 샘플링 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. Snowpark 함수를 래핑하고 출력 DataFrame에서 스키마의 유효성을 검사한 후에 결과를 반환합니다.
예:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: 이 데코레이터는 Snowpark 함수 입력 인자 스키마의 유효성을 검사합니다. 이 데코레이터는 함수를 실행하기 전에 입력 DataFrame이 지정된 Pandera 스키마를 따르는지 확인합니다. 이는 특히 Snowpark 파이프라인에서 데이터 무결성과 일관성을 적용하는 데 유용합니다. 이 데코레이터는 유효성을 검사할 Pandera 스키마, 검사점 이름, 샘플링 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. Snowpark 함수를 래핑하고 입력 DataFrame에서 스키마의 유효성을 검사한 후에 함수를 실행합니다.
예:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
통계 검사¶
Schema
모드에서 검증을 실행하면 기본적으로 특정 열 유형에 통계 검증이 적용되며, skip_checks
를 사용하면 이러한 검사를 건너뛸 수 있습니다.
열 유형 |
기본 검사 |
---|---|
Numeric: |
between: 값이 최솟값과 최댓값 사이(최솟값, 최댓값 포함)에 있는지 확인합니다. decimal_precision: 값이 소수인 경우 소수점 이하 전체 자릿수를 확인합니다. mean: 열의 평균이 특정 범위 내에 있는지 확인합니다. |
부울 |
isin: 값이 True인지, False인지 확인합니다. True_proportion: True 값의 비율이 특정 범위 내에 있는지 확인합니다. False_proportion: False 값의 비율이 특정 범위 내에 있는지 확인합니다. |
Date: |
between: 값이 최솟값과 최댓값 사이(최솟값, 최댓값 포함)에 있는지 확인합니다. |
Nullable: 지원되는 모든 유형 |
Null_proportion: 이에 따라 null 비율을 확인합니다. |
검사 건너뛰기¶
이 세분화된 검사 제어 기능을 사용하면 열 유효성 검사 또는 열에 대한 특정 검사를 건너뛸 수 있습니다. skip_checks
매개 변수 사용 시 특정 열과 건너뛸 유효성 검사 타입을 지정할 수 있습니다. 건너뛰는 데 사용되는 검사의 이름은 검사와 연결된 이름입니다.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
예:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
사용자 지정 검사 항목¶
custom_checks
속성이 있는 JSON 파일에서 생성된 스키마에 다른 검사를 추가할 수 있습니다.
예:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
샘플링 전략¶
제공된 코드의 샘플링 프로세스는 데이터의 대표 샘플을 가져와서 대규모 DataFrames 를 효율적으로 검증하도록 설계되었습니다. 이 접근 방식은 계산 비용과 시간이 많이 소요될 수 있는 전체 데이터 세트를 처리할 필요 없이 스키마 검증을 수행하는 데 도움이 됩니다.
- 매개 변수:
sample_frac
: 이 매개 변수는 샘플링할 DataFrame 의 비율을 지정합니다. 예를 들어sample_frac
을 0.1로 설정하면 DataFrame 행의 10%가 샘플링됩니다. 이는 컴퓨팅 리소스를 절약하기 위해 데이터의 하위 세트를 검증하려는 경우에 유용합니다.sample_number
: 이 매개 변수는 DataFrame 에서 샘플링할 정확한 행 수를 지정합니다. 예를 들어sample_number
가 100으로 설정된 경우 DataFrame 에서 100개의 행이 샘플링됩니다. DataFrame 크기에 관계없이 수정된 행 수를 검증하려는 경우에 유용합니다.
검증 결과¶
모든 타입의 유효성 검사가 실행되면 성공 또는 실패 여부에 관계없이 결과가 :file:`checkpoint_validation_results.json`에 저장됩니다. 이 파일은 주로 VSCode 확장 프로그램의 기능에 사용됩니다. 여기에는 유효성 검사 상태, 타임스탬프, 검사점 이름, 함수 실행이 발생하는 줄 번호, 파일에 관한 정보가 포함됩니다.
또한 검증 결과에 대한 정보가 포함된 *SNOWPARK_CHECKPOINTS_REPORT*라는 테이블의 기본 Snowflake 계정에 결과를 기록합니다.
DATE
: 유효성 검사의 타임스탬프를 실행합니다.JOB
: SnowparkJobContext의 이름입니다.STATUS
: 유효성 검사의 상태입니다.CHECKPOINT
: 유효성이 검사된 검사점의 이름입니다.MESSAGE
: 오류 메시지입니다.DATA
: 유효성 검사 실행의 데이터입니다.EXECUTION_MODE
: 유효성 검사 모드가 실행되었습니다.