Bibliothèque de points de contrôle Snowpark : validateurs¶
Valider le code converti de Snowpark¶
Le paquet Snowpark Checkpoints offre un ensemble de paramètres de validation qui peuvent être appliqués au code Snowpark afin de garantir l’équivalence comportementale avec le code PySpark.
Les fonctions offertes par le cadre¶
check_with_spark : un décorateur qui convertira tout argument du DataFrame Snowpark en une fonction ou un échantillon, puis en DataFrames PySpark. Le contrôle exécute ensuite une fonction Spark fournie qui reproduit la fonctionnalité de la nouvelle fonction Snowpark et compare les sorties entre les deux implémentations. En supposant que la fonction Spark et les fonctions Snowpark sont sémantiquement identiques, ceci permet de vérifier ces fonctions sur des données réelles et échantillonnées.
- Paramètres :
job_context
(SnowparkJobContext) : le contexte de la tâche contenant la configuration et les détails de la validation.spark_function
(fn) : la fonction PySpark équivalente pour comparer avec l’implémentation Snowpark.checkpoint_name
(str) : nom du point de contrôle. La valeur par défaut est Aucun.sample_number
(Optional[int], optional) : le nombre de lignes pour la validation. La valeur par défaut est 100.sampling_strategy
(Optional[SamplingStrategy], optional) : la stratégie utilisée pour l’échantillonnage des données. La valeur par défaut estSamplingStrategy.RANDOM_SAMPLE
.output_path
(Optional[str], optional) : le chemin pour stocker les résultats de la validation. La valeur par défaut est Aucun.
Voici un exemple :
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint : cette fonction valide un Dataframe Snowpark par rapport à un fichier de schéma de point de contrôle spécifique ou un Dataframe importé selon le mode de l’argument. Elle garantit que les informations collectées pour ce DataFrame et le DataFrame transmis à la fonction sont équivalents.
- Paramètres :
df
(SnowparkDataFrame) : le DataFrame à valider.checkpoint_name
(str) : le nom du point de contrôle auquel la validation doit être effectuée.job_context
(SnowparkJobContext, optional) (str) : le contexte de la tâche pour la validation. Obligatoire pour le mode PARQUET.mode
(CheckpointMode) : le mode de validation (par exemple, SCHEMA, PARQUET). La valeur par défaut est SCHEMA.custom_checks
(Optional[dict[Any, Any]], optional) : Vérifications personnalisées à appliquer lors de la validation.skip_checks
(Optional[dict[Any, Any]], optional) : contrôles à ignorer lors de la validation.sample_frac
(Optional[float], optional) : fraction du DataFrame à échantillonner pour la validation. La valeur par défaut est 0.1.sample_number
(Optional[int], optional) : nombre de lignes à échantillonner pour la validation.sampling_strategy
(Optional[SamplingStrategy], Optional) : stratégie à utiliser pour l’échantillon.output_path
(Optional[str], optional) : le chemin de sortie des résultats de la validation.
Voici un exemple :
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
En fonction du mode choisi, la validation utilisera soit le fichier de schéma collecté, soit un Dataframe chargé par Parquet dans Snowflake pour vérifier l’équivalence par rapport à la version PySpark.
check-output_schema : ce décorateur valide le schéma de la sortie d’une fonction Snowpark et s’assure que la sortie DataFrame est conforme à un schéma Pandera spécifié. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame de sortie avant de renvoyer le résultat.
Voici un exemple :
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema : ce décorateur valide le schéma des arguments d’entrée d’une fonction Snowpark. Ce décorateur garantit que le DataFrame d’entrée est conforme à un schéma Pandera spécifié avant l’exécution de la fonction. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame d’entrée avant d’exécuter la fonction.
Voici un exemple :
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Contrôles statistiques¶
Les validations statistiques sont appliquées par défaut au type de colonne spécifique lorsque la validation est exécutée en mode Schema
; ces contrôles peuvent être ignorés avec skip_checks
.
Type de colonne |
Contrôle par défaut |
---|---|
Numérique : |
entre : si la valeur est comprise entre le min ou le max, y compris le min et le max. decimal_precision : si la valeur est décimale, ceci vérifiera la précision décimale. moyenne : validez si la moyenne des colonnes est comprise dans une plage spécifique. |
Booléen |
isin : validez si la valeur est Vrai ou Faux. True_proportion : validez si la proportion des valeurs Vrai se situe dans une plage spécifique. False_proportion : validation si la proportion des valeurs Faux est comprise dans une plage spécifique. |
Date : |
entre : si la valeur est comprise entre le min ou le max, y compris le min et le max. |
Null possible : tous les types pris en charge |
Null_proportion : validez la proportion nulle en conséquence. |
Ignorer les contrôles¶
Il existe un contrôle granulaire des contrôles, qui vous permet d’ignorer la validation d’une colonne ou des contrôles spécifiques pour une colonne. Avec le paramètre skip_checks
, vous pouvez spécifier la colonne particulière et le type de validation que vous souhaitez ignorer. Le nom du contrôle utilisé à ignorer est celui qui est associé au contrôle.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Contrôles personnalisés¶
Vous pouvez ajouter des contrôles supplémentaires au schéma généré à partir du fichier JSON à l’aide de la propriété custom_checks
. Cette opération ajoutera le contrôle au schéma pandera :
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Stratégies d’échantillonnage¶
Le processus d’échantillonnage du code fourni est conçu pour valider efficacement de larges DataFrames en prenant un échantillon représentatif des données. Cette approche permet d’effectuer la validation des schémas sans avoir à traiter l’ensemble des données, ce qui peut s’avérer coûteux en temps et en argent.
- Paramètres :
sample_frac
: ce paramètre indique la fraction du DataFrame à échantillonner. Par exemple, sisample_frac
est défini sur 0.1, 10 % des lignes du DataFrame seront échantillonnées. Ceci est utile lorsque vous souhaitez valider un sous-ensemble de données afin d’économiser les ressources calcul.sample_number
: ce paramètre indique le nombre exact de lignes à échantillonner dans le DataFrame. Par exemple, sisample_number
est défini sur 100, 100 lignes seront échantillonnées dans le DataFrame. Cette fonction est utile lorsque vous souhaitez valider un nombre fixe de lignes, quelle que soit la taille du DataFrame.
Résultat de validation¶
Après l’exécution de tout type de validation, le résultat, qu’il soit positif ou négatif, est enregistré sur checkpoint_validation_results.json
. Ce fichier est principalement utilisé pour les fonctionnalités de l’extension VSCode. Il contient des informations sur le statut de la validation, l’horodatage, le nom du point de contrôle, le numéro de la ligne où se produit l’exécution de la fonction et le fichier.
Il consignera également le résultat dans le compte Snowflake par défaut dans une table appelée SNOWPARK_CHECKPOINTS_REPORT, qui contiendra des informations sur le résultat de la validation.
DATE
: horodatage d’exécution de la validation.JOB
: nom du SnowparkJobContext.STATUS
: statut de la validation.CHECKPOINT
: nom du point de contrôle validé.MESSAGE
: message d’erreurDATA
: données de l’exécution de la validation.EXECUTION_MODE
: le mode de validation est exécuté.