Bibliothèque de points de contrôle Snowpark : validateurs¶
Le paquet Snowpark Checkpoints offre un ensemble de paramètres de validation qui peuvent être appliqués au code Snowpark afin de garantir l’équivalence comportementale avec le code PySpark.
Fonctions fournies par le cadre¶
check_with_spark : un décorateur qui convertira tout argument du DataFrame Snowpark en une fonction ou un échantillon, puis en PySpark DataFrames. Le contrôle exécutera ensuite une fonction spark fournie qui reflète la fonctionnalité de la nouvelle fonction Snowpark et comparera les sorties entre les deux implémentations. En supposant que la fonction spark et les fonctions Snowpark sont sémantiquement identiques, ce décorateur vérifie ces fonctions sur des données réelles et échantillonnées.
- Paramètres :
job_context
(SnowparkJobContext) : le contexte de la tâche contenant la configuration et les détails de la validationspark_function
(fn) : la fonction PySpark équivalente pour comparer avec l’implémentation Snowparkcheckpoint_name
(str): nom du point de contrôle ; la valeur par défaut est Aucun.sample_number
(Facultatif[int], facultatif) : le nombre de lignes pour la validation ; la valeur par défaut est 100sampling_strategy
(Facultatif[SamplingStrategy], facultatif) : la stratégie utilisée pour l’échantillonnage des données ; par défautSamplingStrategy.RANDOM_SAMPLE
output_path
(Facultatifl[str], facultatif) : le chemin d’accès au fichier dans lequel les résultats de la validation sont stockés ; la valeur par défaut est Aucun.
Exemple :
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint : cette fonction valide un Dataframe Snowpark par rapport à un fichier de schéma de point de contrôle spécifique ou un Dataframe importé selon le mode de l’argument. Il garantit que les informations collectées pour cet DataFrame et le DataFrame transmis à la fonction sont équivalents.
- Paramètres :
df
(SnowparkDataFrame) : le DataFrame à validercheckpoint_name
(str) : le nom du point de contrôle avec lequel la validation doit être effectuéejob_context
(SnowparkJobContext, facultatif) (str) : le contexte de la tâche pour la validation ; requis pour le mode PARQUETmode
(CheckpointMode) : le mode de validation (par exemple, SCHEMA, PARQUET) ; par défaut SCHEMAcustom_checks
(Facultatif[dict[Any, Any]], facultatif) : contrôles personnalisés à appliquer lors de la validationskip_checks
(Facultatifl[dict[Any, Any]], facultatif) : contrôles à ignorer lors de la validationsample_frac
(Facultatif[float], facultatif) : fraction du DataFrame à échantillonner pour la validation ; la valeur par défaut est 0,1sample_number
(Facultatif[int], facultatif) : nombre de lignes à échantillonner pour la validationsampling_strategy
(Facultatif[SamplingStrategy], Facultatif) : stratégie à utiliser pour l’échantillonoutput_path
(Facultatifl[str], facultatif) : chemin d’accès au fichier dans lequel les résultats de la validation sont stockés
Exemple :
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
En fonction du mode choisi, la validation utilisera soit le fichier de schéma collecté, soit un Dataframe chargé par Parquet dans Snowflake pour vérifier l’équivalence par rapport à la version PySpark.
check-output_schema : ce décorateur valide le schéma de la sortie d’une fonction Snowpark et s’assure que le DataFrame de sortie est conforme à un schéma Pandera spécifié. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame de sortie avant de renvoyer le résultat.
Exemple :
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema : ce décorateur valide le schéma des arguments d’entrée d’une fonction Snowpark. Ce décorateur garantit que le DataFrame d’entrée est conforme à un schéma Pandera spécifié avant l’exécution de la fonction. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame d’entrée avant d’exécuter la fonction.
Exemple :
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Contrôles de statistiques¶
Les validations statistiques sont appliquées par défaut au type de colonne spécifique lorsque la validation est exécutée en mode Schema
; ces contrôles peuvent être ignorés avec skip_checks
.
Type de colonne |
Contrôle par défaut |
---|---|
Numérique : |
entre : valide si la valeur est comprise entre le min ou le max, y compris le min et le max. decimal_precision : si la valeur est décimale, ceci vérifiera la précision décimale. moyenne : valide si la moyenne des colonnes est comprise dans une plage spécifique. |
Booléen |
isin : valide si la valeur est vraie ou fausse. True_proportion : valide si la proportion des valeurs vraies se situe dans une plage spécifique. False_proportion : valide si la proportion des valeurs fausses est comprise dans une plage spécifique. |
Date : |
entre : valide si la valeur est comprise entre le min ou le max, y compris le min et le max. |
Null possible : tous les types pris en charge |
Null_proportion : valide la proportion nulle en conséquence. |
Ignorer les contrôles¶
Grâce à ce contrôle granulaire des contrôles, vous pouvez sauter la validation d’une colonne ou des contrôles spécifiques pour une colonne. Avec le paramètre skip_checks
, vous pouvez spécifier la colonne particulière et le type de validation que vous souhaitez ignorer. Le nom du contrôle utilisé à ignorer est celui qui est associé au contrôle.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
Exemple :
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Contrôles personnalisés¶
Vous pouvez ajouter des contrôles supplémentaires au schéma généré à partir du fichier JSON à l’aide de la propriété custom_checks
. Cette opération ajoute le contrôle au schéma pandera.
Exemple :
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Stratégies d’échantillonnage¶
Le processus d’échantillonnage du code fourni est conçu pour valider efficacement de larges DataFrames en prenant un échantillon représentatif des données. Cette approche permet d’effectuer la validation des schémas sans avoir à traiter l’ensemble des données, ce qui peut s’avérer coûteux en temps et en argent.
- Paramètres :
sample_frac
: ce paramètre indique la fraction du DataFrame à échantillonner. Par exemple, sisample_frac
est défini sur 0.1, 10 % des lignes du DataFrame seront échantillonnées. Ceci est utile lorsque vous souhaitez valider un sous-ensemble de données afin d’économiser les ressources calcul.sample_number
: ce paramètre indique le nombre exact de lignes à échantillonner dans le DataFrame. Par exemple, sisample_number
est défini sur 100, 100 lignes seront échantillonnées dans le DataFrame. Cette fonction est utile lorsque vous souhaitez valider un nombre fixe de lignes, quelle que soit la taille du DataFrame.
Résultat de validation¶
Après l’exécution de tout type de validation, le résultat, qu’il soit positif ou négatif, est enregistré sur checkpoint_validation_results.json
. Ce fichier est principalement utilisé pour les fonctionnalités de l’extension VSCode. Il contient des informations sur l’état de la validation, l’horodatage, le nom du point de contrôle, le numéro de la ligne où se produit l’exécution de la fonction et le fichier.
Il consigne également le résultat dans le compte Snowflake par défaut dans une table appelée SNOWPARK_CHECKPOINTS_REPORT, qui contient des informations sur le résultat de la validation.
DATE
: horodatage d’exécution de la validationJOB
: nom du SnowparkJobContextSTATUS
: état de la validationCHECKPOINT
: nom du point de contrôle validéMESSAGE
: message d’erreurDATA
: données de l’exécution de la validationEXECUTION_MODE
: le mode de validation est exécuté