Snowpark Checkpoints 라이브러리: 유효성 검사기

Snowpark 변환 코드 검증하기

Snowpark Checkpoints 패키지는 PySpark 코드에 대한 동작 동등성을 보장하기 위해 Snowpark 코드에 적용할 수 있는 검증 세트를 제공합니다.

프레임워크에서 제공하는 함수

  • check_with_spark: 모든 Snowpark DataFrame 인자를 함수나 샘플로 변환한 다음 PySpark DataFrames 로 변환하는 데코레이터입니다. 그런 다음 검사에서는 새로운 Snowpark 함수의 기능을 미러링하는 제공된 Spark 함수를 실행하고 두 구현 간의 출력을 비교합니다. Spark 함수와 Snowpark 함수가 의미적으로 동일하다고 가정하면 실제 샘플 데이터에서 해당 함수를 검증합니다.

    매개 변수:
    • job_context (SnowparkJobContext): 검증을 위한 구성 및 세부 정보가 포함된 작업 컨텍스트입니다.

    • spark_function (fn): Snowpark 구현과 비교할 수 있는 PySpark 함수입니다.

    • checkpoint_name (str): 검사점의 이름입니다. 기본값은 None입니다.

    • sample_number (Optional[int], 선택 사항): 검증을 위한 행 수입니다. 기본값은 100입니다.

    • sampling_strategy (Optional[SamplingStrategy], 선택 사항): 데이터 샘플링에 사용되는 전략입니다. 기본값은 SamplingStrategy.RANDOM_SAMPLE 입니다.

    • output_path (Optional[str], 선택 사항): 검증 결과를 저장할 경로입니다. 기본값은 None입니다.

    다음은 그 예입니다.

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: 이 함수는 인자 모드에 따라 특정 검사점 스키마 파일 또는 가져온 데이터프레임에 대해 Snowpark Dataframe을 검증합니다. 이는 해당 DataFrame 에 대해 수집된 정보와 함수에 전달되는 DataFrame 이 동일한지 확인합니다.

    매개 변수:
    • df (SnowparkDataFrame): 검증할 DataFrame 입니다.

    • checkpoint_name (str): 검증할 검사점의 이름입니다.

    • job_context (SnowparkJobContext, 선택 사항) (str): 검증을 위한 작업 컨텍스트입니다. PARQUET 모드에 필요합니다.

    • mode (CheckpointMode): 검증 모드(예: SCHEMA, PARQUET)입니다. 기본값은 SCHEMA 입니다.

    • custom_checks (Optional[dict[Any, Any]], 선택 사항): 검증 중에 적용할 사용자 지정 검사 항목입니다.

    • skip_checks (Optional[dict[Any, Any]], 선택 사항): 검증 중 건너뛸 검사 항목입니다.

    • sample_frac (Optional[float], 선택 사항): 검증을 위해 샘플링할 DataFrame 의 일부입니다. 기본값은 0.1입니다.

    • sample_number (Optional[int], 선택 사항): 검증을 위해 샘플링할 행 수입니다.

    • sampling_strategy (Optional[SamplingStrategy], 선택 사항): 샘플링에 사용할 전략입니다.

    • output_path (Optional[str], 선택 사항): 검증 결과의 출력 경로입니다.

    다음은 그 예입니다.

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    선택한 모드에 따라 검증은 수집된 스키마 파일 또는 Parquet에 로딩된 데이터프레임(Snowflake)을 사용하여 PySpark 버전과 동등성을 확인합니다.

  • check-output_schema: 이 데코레이터는 Snowpark 함수 출력 스키마의 유효성을 검사하고 DataFrame 출력이 지정된 Pandera 스키마를 준수하는지 확인합니다. 특히 Snowpark 파이프라인에서 데이터 통합과 일관성을 강화하는 데 유용합니다. 이 데코레이터는 검증할 Pandera 스키마, 검사점 이름, 샘플 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. 이 함수는 Snowpark 함수를 래핑하고 출력 DataFrame 에 대해 스키마 검증을 수행한 후 결과를 반환합니다.

    다음은 그 예입니다.

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: 이 데코레이터는 Snowpark 함수의 입력 인자에 대한 스키마를 검증합니다. 이 데코레이터는 함수가 실행되기 전에 입력 DataFrame 이 지정된 Pandera 스키마를 준수하는지 확인합니다. 특히 Snowpark 파이프라인에서 데이터 통합과 일관성을 강화하는 데 유용합니다. 이 데코레이터는 검증할 Pandera 스키마, 검사점 이름, 샘플 매개 변수, 선택적 작업 컨텍스트 등 여러 매개 변수를 사용합니다. 함수를 실행하기 전에 Snowpark 함수를 래핑하고 입력 DataFrame 에 대해 스키마 검증을 수행합니다.

    다음은 그 예입니다.

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

통계 검사 항목

Schema 모드에서 검증을 실행하면 기본적으로 특정 열 유형에 통계 검증이 적용되며, skip_checks 를 사용하면 이러한 검사를 건너뛸 수 있습니다.

열 유형

기본 검사

Numeric: byte, short, integer, long, float, double

between: 값이 최소값과 최대값 사이에 있는 경우 최소값과 최대값을 포함합니다.

decimal_precision: 값이 소수점인 경우 소수점 이하 전체 자릿수를 확인합니다.

mean: 열의 평균이 특정 범위 내에 있는지 확인합니다.

부울

isin: 값이 True인지 False인지 확인합니다.

True_proportion: True 값의 비율이 특정 범위 내에 있는지 확인합니다.

False_proportion: False 값의 비율이 특정 범위 내에 속하는지 확인합니다.

Date: date, timestamp, timestamp_ntz

between: 값이 최소값과 최대값 사이에 있는 경우 최소값과 최대값을 포함합니다.

Nullable: 지원되는 모든 유형

Null_proportion: Null 비율을 적절히 검증합니다.

검사 건너뛰기

열 검증 또는 열에 대한 특정 검사를 건너뛸 수 있는 세분화된 검사 제어 기능이 있습니다. 매개 변수 skip_checks 를 사용하여 특정 열과 건너뛸 검증 유형을 지정할 수 있습니다. 건너뛰는 데 사용되는 검사 이름은 검사와 연결된 이름입니다.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

사용자 지정 검사 항목

custom_checks 속성을 사용하여 JSON 파일에서 생성된 스키마에 추가 검사를 추가할 수 있습니다. 이렇게 하면 Pandera 스키마에 검사가 추가됩니다.

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

샘플링 전략

제공된 코드의 샘플링 프로세스는 데이터의 대표 샘플을 가져와서 대규모 DataFrames 를 효율적으로 검증하도록 설계되었습니다. 이 접근 방식은 계산 비용과 시간이 많이 소요될 수 있는 전체 데이터 세트를 처리할 필요 없이 스키마 검증을 수행하는 데 도움이 됩니다.

매개 변수:
  • sample_frac: 이 매개 변수는 샘플링할 DataFrame 의 비율을 지정합니다. 예를 들어 sample_frac 을 0.1로 설정하면 DataFrame 행의 10%가 샘플링됩니다. 이는 컴퓨팅 리소스를 절약하기 위해 데이터의 하위 세트를 검증하려는 경우에 유용합니다.

  • sample_number: 이 매개 변수는 DataFrame 에서 샘플링할 정확한 행 수를 지정합니다. 예를 들어 sample_number 가 100으로 설정된 경우 DataFrame 에서 100개의 행이 샘플링됩니다. DataFrame 크기에 관계없이 수정된 행 수를 검증하려는 경우에 유용합니다.

검증 결과

모든 유형의 검증이 실행되면 합격 또는 불합격 여부에 관계없이 결과가 checkpoint_validation_results.json 에 저장됩니다. 이 파일은 주로 VSCode 확장 프로그램의 기능에 사용됩니다. 여기에는 검증 상태, 타임스탬프, 검사점 이름, 함수 실행이 발생한 줄 번호, 파일에 대한 정보가 포함됩니다.

또한 검증 결과에 대한 정보가 포함된 SNOWPARK_CHECKPOINTS_REPORT 라는 테이블에 기본 Snowflake 계정에 결과를 기록합니다.

  • DATE: 검증의 타임스탬프를 실행합니다.

  • JOB: SnowparkJobContext 의 이름입니다.

  • STATUS: 검증 상태입니다.

  • CHECKPOINT: 검증된 검사점의 이름입니다.

  • MESSAGE: 오류 메시지입니다.

  • DATA: 검증 실행의 데이터입니다.

  • EXECUTION_MODE: 검증 모드가 실행되었습니다.