Snowpark Checkpoints library: Validators¶

Validate Snowpark Converted Code¶

The Snowpark Checkpoints package offers a set of validations that can be applied to the Snowpark code to ensure behavioral equivalence against the PySpark code.

Functions provided by the Framework¶

  • check_with_spark: A decorator that will convert any Snowpark DataFrame arguments to a function or sample and then to PySpark DataFrames. The check will then execute a provided spark function that mirrors the functionality of the new Snowpark function and compares the outputs between the two implementations. Assuming the spark function and Snowpark functions are semantically identical this verifies those functions on real, sampled data.

    Parameters:
    • job_context (SnowparkJobContext): The job context containing configuration and details for the validation.

    • spark_function (fn): The equivalent PySpark function to compare against the Snowpark implementation.

    • checkpoint_name (str): A name for the checkpoint. Defaults to None.

    • sample_number (Optional[int], optional): The number of rows for validation. Defaults to 100.

    • sampling_strategy (Optional[SamplingStrategy], optional): The strategy used for sampling data. Defaults to SamplingStrategy.RANDOM_SAMPLE.

    • output_path (Optional[str], optional): The path to store the validation results. Defaults to None.

    Following is an example:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: This function validates a Snowpark Dataframe against a specific checkpoint schema file or imported Dataframe according to the argument mode. It ensures that the information collected for that DataFrame and the DataFrame that is passed to the function are equivalent.

    Parameters:
    • df (SnowparkDataFrame): The DataFrame to validate.

    • checkpoint_name (str): The name of the checkpoint to validate against.

    • job_context (SnowparkJobContext, optional) (str): The job context for the validation. Required for PARQUET mode.

    • mode (CheckpointMode): The mode of validation (e.g., SCHEMA, PARQUET). Defaults to SCHEMA.

    • custom_checks (Optional[dict[Any, Any]], optional): Custom checks to apply during validation.

    • skip_checks (Optional[dict[Any, Any]], optional): Checks to skip during validation.

    • sample_frac (Optional[float], optional): Fraction of the DataFrame to sample for validation. Defaults to 0.1.

    • sample_number (Optional[int], optional): Number of rows to sample for validation.

    • sampling_strategy (Optional[SamplingStrategy], optional): Strategy to use for sampling.

    • output_path (Optional[str], optional): The output path for the validation results.

    Following is an example:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    Depending on the mode selected the validation will use either the collected schema file or a Parquet-loaded Dataframe in Snowflake to verify the equivalence against the PySpark version.

  • check-output_schema: This decorator validates the schema of a Snowpark function’s output and ensures that the output DataFrame conforms to a specified Pandera schema. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the output DataFrame before returning the result.

    Following is an example:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: This decorator validates the schema of a Snowpark function’s input arguments. This decorator ensures that the input DataFrame conforms to a specified Pandera schema before the function is executed. It is particularly useful for enforcing data integrity and consistency in Snowpark pipelines. This decorator takes several parameters, including the Pandera schema to validate against, the checkpoint name, sampling parameters, and an optional job context. It wraps the Snowpark function and performs schema validation on the input DataFrame before executing the function.

    Following is an example:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Statistic checks¶

Statistics validations are applied to the specific column type by default when the validation is run in Schema mode; these checks can be skipped with skip_checks.

Column Type

Default Check

Numeric: byte, short, integer, long, float, and double

between: If the value is between the min or the max, including the min and max.

decimal_precision: If the value is decimal, this will check the decimal precision.

mean: Validate if the mean of the columns falls within a specific range.

Boolean

isin: Validate if the value is True or False.

True_proportion: Validate if the proportion of the True values falls within a specific range.

False_proportion: Validation if the proportion of the False values falls within a specific range.

Date: date, timestamp, and timestamp_ntz

between: If the value is between the min or the max, including the min and max.

Nullable: All supported types

Null_proportion: Validate the null proportion accordingly.

Skip checks¶

There is a granular control for checks, which allows you to skip column validation or specific checks for a column. With the parameter skip_checks, you can specify the particular column and which validation type you want to skip. The name of the check used to skip is the one associated with the check.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Custom checks¶

You can add additional checks to the schema generated from the JSON file with the custom_checks property. This will add the check to the pandera schema:

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

Sampling strategies¶

The provided code’s sampling process is designed to efficiently validate large DataFrames by taking a representative sample of the data. This approach helps perform schema validation without the need to process the entire dataset, which can be computationally expensive and time-consuming.

Parameters:
  • sample_frac: This parameter specifies the fraction of the DataFrame to sample. For example, if sample_frac is set to 0.1, then 10 percent of the DataFrame rows will be sampled. This is useful when you want to validate a subset of the data to save on computational resources.

  • sample_number: This parameter specifies the exact number of rows to sample from the DataFrame. For example, if sample_number is set to 100, then 100 rows will be sampled from the DataFrame. This is useful when you want to validate a fixed number of rows regardless of the DataFrame size.

Validation result¶

After any type of validation is executed, the result, whether it passes or fails, will be saved into checkpoint_validation_results.json. This file is mostly used for the functionalities of the VSCode extension. It will contain information about the status of the validation, timestamp, checkpoint name, number of line where the execution of the function occurs, and the file.

It will also log the result into the default Snowflake account in a table called SNOWPARK_CHECKPOINTS_REPORT, which will contain information about the validation result.

  • DATE: Execute timestamp of the validation.

  • JOB: Name of the SnowparkJobContext.

  • STATUS: Status of the validation.

  • CHECKPOINT: Name of the checkpoint validated.

  • MESSAGE: Error message.

  • DATA: Data from the validation execution.

  • EXECUTION_MODE: Validation mode executed.