Snowpark Checkpoints library: Collectors¶
The Snowpark Checkpoints Python package provides a range of functionalities to support the validation of migrated workloads. This section outlines the key features and capabilities included in the package, along with guidance on how to use them effectively.
Collect information on your PySpark Code¶
The snowpark-checkpoints-collectors
package offers a function for extracting information from the PySpark DataFrames. We can then use that data to validate against the converted Snowpark DataFrames to ensure behavioral equivalence.
Use the following function to insert a new checkpoint collection point:
Function signature:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
Function parameters:
df: The PySpark DataFrame.
checkpoint_name: The name of the checkpoint. Starts with a letter (A-Z, a-z) or an underscore (_) and contains only letters, underscores and decimal digits (0-9).
sample: (optional) The sample size. The default value is 1.0 (entire PySpark DataFrame) in a range from 0 to 1.0.
mode: (optional) The execution mode. Options are
SCHEMA
andDATAFRAME
. The default value isSCHEMA
.output_path: (optional) The output path to save the checkpoint. The default value is the current working directory.
The collection process generates an output file, called checkpoint_collection_result.json
, with the information about the result for each collection point. It is a JSON file and contains the following information:
A timestamp when the collection point started.
The relative path of the file where the collection point is.
The line of code of the file where the collection point is.
The name of the collection point checkpoint.
The result of the collection point (fail or pass).
Schema inference collected data mode (Schema)¶
This is the default mode, which leverages Pandera schema inference to obtain the metadata and checks that will be evaluated for the specified DataFrame. This mode also collects custom data from columns of the DataFrame based on the PySpark type.
The column data and checks are collected based on the PySpark type of the column (see the tables below). For any column, no matter its type, custom data collected will include the name of the column, the type of the column, nullable, the count of rows, the count of not null rows, and the count of null rows.
Column type |
Custom data collected |
---|---|
Numeric ( |
The minimum value. The maximum value. The mean value. The decimal precision (in case of integer type, the value is zero). The standard deviation. |
Date |
The minimum value. The maximum value. The format of the date: %Y-%m-%d |
DayTimeIntervalType and YearMonthIntervalType |
The minimum value. The maximum value. |
Timestamp |
The minimum value. The maximum value. The format of the date: %Y-%m-%dH:%M:%S |
Timestamp ntz |
The minimum value. The maximum value. The format of the date: %Y-%m-%dT%H:%M:%S%z |
String |
The minimum length value. The maximum length value. |
Char |
PySpark handles any literal as a string type, therefore char is not a valid type. |
Varchar |
PySpark handles any literal as a string type, therefore Varchar is not a valid type. |
Decimal |
The minimum value. The maximum value. The mean value. The decimal precision. |
Array |
The type of the value. If allowed, null as an element. The proportion of null values. The maximum array size. The minimum array size. The mean size of arrays. If all arrays have the same size. |
Binary |
The maximum size. The minimum size. The mean size. If all elements have the same size. |
Map |
The type of the key. The type of the value. If allowed, null as a value. The proportion of null values. The maximum map size. The minimum map size. The mean map size. If all maps have the same size. |
Null |
NullType represents None, because the type data can not be determined; therefore it is not possible to get information from this type. |
Struct |
The metadata of the struct, it is for each structField: |
It also defines a set of predefined validations checks for each data type detailed in the following table:
Type |
Pandera Checks |
Additional Checks |
---|---|---|
Boolean |
Each value is True or False. |
The count of True and False values. |
Numeric ( |
Each value is in the range of min value and max value. |
The decimal precision. The mean value. The standard deviation. |
Date |
N/A |
Minimum and maximum values |
Timestamp |
Each value is in the range of min value and max value. |
The format of the value. |
Timestamp ntz |
Each value is in the range of min value and max value. |
The format of the value. |
String |
Each value length is in the range of min and max length. |
None |
Char |
PySpark handles any literal as a string type, therefore |
|
Varchar |
PySpark handles any literal as a string type, therefore |
|
Decimal |
N/A |
N/A |
Array |
N/A |
None |
Binary |
N/A |
None |
Map |
N/A |
None |
Null |
N/A |
N/A |
Struct |
N/A |
None |
This mode allows the user to define a sample of a DataFrame to collect, but it is optional. By default, the collection works with the entire DataFrame. The size of the sample must represent the population statistically.
Pandera can only infer the schema of a Pandas DataFrame, which implies that the PySpark DataFrame must be converted into a Pandas DataFrame, which can affect the columns’ type resolutions. In particular, Pandera infers the following PySpark types as object types: string
, array
, map
, null
, struct
, and binary
.
The output of this mode is a JSON file for each collected DataFrame, where the name of the file is the same as the checkpoint. This file contains information related to the schema and has two sections:
The Pandera schema section contains the data inferred by Pandera such as name, type (Pandas), if the column allows null values or not, and other information for each column, and checks if the column is based on the PySpark type. It is a
DataFrameSchema
object of Pandera.The custom data section is an array of the custom data collected by each column based on the PySpark type.
Note
The collection package might have memory issues when processing large PySpark DataFrames. To address this, you can set the sample parameter in the collection function to a value between 0.0 and 1.0, in order to work with a subset of the data instead of the entire PySpark DataFrame.
DataFrame collected data mode (DataFrame)¶
This mode collects the data of the PySpark DataFrame. In this case, the mechanism saves all data of the given DataFrame in parquet format. Using the default user Snowflake connection, it tries to upload the parquet files into the Snowflake temporal stage and create a table based on the information in the stage. The name of the file and the table are the same as the checkpoint.
The output of this mode is a parquet file result of the DataFrame saved and a table with the DataFrame data in the default Snowflake configuration connection.