Bibliothèque de points de contrôle Snowpark : validateurs

Le paquet Snowpark Checkpoints offre un ensemble de paramètres de validation qui peuvent être appliqués au code Snowpark afin de garantir l’équivalence comportementale avec le code PySpark.

Fonctions fournies par le cadre

  • check_with_spark : un décorateur qui convertira tout argument du DataFrame Snowpark en une fonction ou un échantillon, puis en PySpark DataFrames. Le contrôle exécutera ensuite une fonction spark fournie qui reflète la fonctionnalité de la nouvelle fonction Snowpark et comparera les sorties entre les deux implémentations. En supposant que la fonction spark et les fonctions Snowpark sont sémantiquement identiques, ce décorateur vérifie ces fonctions sur des données réelles et échantillonnées.

    Paramètres :
    • job_context (SnowparkJobContext) : le contexte de la tâche contenant la configuration et les détails de la validation

    • spark_function (fn) : la fonction PySpark équivalente pour comparer avec l’implémentation Snowpark

    • checkpoint_name (str): nom du point de contrôle ; la valeur par défaut est Aucun.

    • sample_number (Facultatif[int], facultatif) : le nombre de lignes pour la validation ; la valeur par défaut est 100

    • sampling_strategy (Facultatif[SamplingStrategy], facultatif) : la stratégie utilisée pour l’échantillonnage des données ; par défaut SamplingStrategy.RANDOM_SAMPLE

    • output_path (Facultatifl[str], facultatif) : le chemin d’accès au fichier dans lequel les résultats de la validation sont stockés ; la valeur par défaut est Aucun.

    Exemple :

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint : cette fonction valide un Dataframe Snowpark par rapport à un fichier de schéma de point de contrôle spécifique ou un Dataframe importé selon le mode de l’argument. Il garantit que les informations collectées pour cet DataFrame et le DataFrame transmis à la fonction sont équivalents.

    Paramètres :
    • df (SnowparkDataFrame) : le DataFrame à valider

    • checkpoint_name (str) : le nom du point de contrôle avec lequel la validation doit être effectuée

    • job_context (SnowparkJobContext, facultatif) (str) : le contexte de la tâche pour la validation ; requis pour le mode PARQUET

    • mode (CheckpointMode) : le mode de validation (par exemple, SCHEMA, PARQUET) ; par défaut SCHEMA

    • custom_checks (Facultatif[dict[Any, Any]], facultatif) : contrôles personnalisés à appliquer lors de la validation

    • skip_checks (Facultatifl[dict[Any, Any]], facultatif) : contrôles à ignorer lors de la validation

    • sample_frac (Facultatif[float], facultatif) : fraction du DataFrame à échantillonner pour la validation ; la valeur par défaut est 0,1

    • sample_number (Facultatif[int], facultatif) : nombre de lignes à échantillonner pour la validation

    • sampling_strategy (Facultatif[SamplingStrategy], Facultatif) : stratégie à utiliser pour l’échantillon

    • output_path (Facultatifl[str], facultatif) : chemin d’accès au fichier dans lequel les résultats de la validation sont stockés

    Exemple :

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
        df1,
        "demo_add_a_column_dataframe",
        job_context=job_context,
        mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    En fonction du mode choisi, la validation utilisera soit le fichier de schéma collecté, soit un Dataframe chargé par Parquet dans Snowflake pour vérifier l’équivalence par rapport à la version PySpark.

  • check-output_schema : ce décorateur valide le schéma de la sortie d’une fonction Snowpark et s’assure que le DataFrame de sortie est conforme à un schéma Pandera spécifié. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame de sortie avant de renvoyer le résultat.

    Exemple :

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema : ce décorateur valide le schéma des arguments d’entrée d’une fonction Snowpark. Ce décorateur garantit que le DataFrame d’entrée est conforme à un schéma Pandera spécifié avant l’exécution de la fonction. Il est particulièrement utile pour assurer l’intégration et la cohérence des données dans les pipelines Snowpark. Ce décorateur prend plusieurs paramètres, dont le schéma Pandera à valider, le nom du point de contrôle, les paramètres d’échantillonnage et un contexte de tâche optionnelle. Il enveloppe la fonction Snowpark et effectue une validation de schéma sur le DataFrame d’entrée avant d’exécuter la fonction.

    Exemple :

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Contrôles de statistiques

Les validations statistiques sont appliquées par défaut au type de colonne spécifique lorsque la validation est exécutée en mode Schema ; ces contrôles peuvent être ignorés avec skip_checks.

Type de colonne

Contrôle par défaut

Numérique : byte, short, integer, long, float et double

entre : valide si la valeur est comprise entre le min ou le max, y compris le min et le max.

decimal_precision : si la valeur est décimale, ceci vérifiera la précision décimale.

moyenne : valide si la moyenne des colonnes est comprise dans une plage spécifique.

Booléen

isin : valide si la valeur est vraie ou fausse.

True_proportion : valide si la proportion des valeurs vraies se situe dans une plage spécifique.

False_proportion : valide si la proportion des valeurs fausses est comprise dans une plage spécifique.

Date : date, timestamp et timestamp_ntz

entre : valide si la valeur est comprise entre le min ou le max, y compris le min et le max.

Null possible : tous les types pris en charge

Null_proportion : valide la proportion nulle en conséquence.

Ignorer les contrôles

Grâce à ce contrôle granulaire des contrôles, vous pouvez sauter la validation d’une colonne ou des contrôles spécifiques pour une colonne. Avec le paramètre skip_checks, vous pouvez spécifier la colonne particulière et le type de validation que vous souhaitez ignorer. Le nom du contrôle utilisé à ignorer est celui qui est associé au contrôle.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

Exemple :

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Contrôles personnalisés

Vous pouvez ajouter des contrôles supplémentaires au schéma généré à partir du fichier JSON à l’aide de la propriété custom_checks. Cette opération ajoute le contrôle au schéma pandera.

Exemple :

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

Stratégies d’échantillonnage

Le processus d’échantillonnage du code fourni est conçu pour valider efficacement de larges DataFrames en prenant un échantillon représentatif des données. Cette approche permet d’effectuer la validation des schémas sans avoir à traiter l’ensemble des données, ce qui peut s’avérer coûteux en temps et en argent.

Paramètres :
  • sample_frac : ce paramètre indique la fraction du DataFrame à échantillonner. Par exemple, si sample_frac est défini sur 0.1, 10 % des lignes du DataFrame seront échantillonnées. Ceci est utile lorsque vous souhaitez valider un sous-ensemble de données afin d’économiser les ressources calcul.

  • sample_number : ce paramètre indique le nombre exact de lignes à échantillonner dans le DataFrame. Par exemple, si sample_number est défini sur 100, 100 lignes seront échantillonnées dans le DataFrame. Cette fonction est utile lorsque vous souhaitez valider un nombre fixe de lignes, quelle que soit la taille du DataFrame.

Résultat de validation

Après l’exécution de tout type de validation, le résultat, qu’il soit positif ou négatif, est enregistré sur checkpoint_validation_results.json. Ce fichier est principalement utilisé pour les fonctionnalités de l’extension VSCode. Il contient des informations sur l’état de la validation, l’horodatage, le nom du point de contrôle, le numéro de la ligne où se produit l’exécution de la fonction et le fichier.

Il consigne également le résultat dans le compte Snowflake par défaut dans une table appelée SNOWPARK_CHECKPOINTS_REPORT, qui contient des informations sur le résultat de la validation.

  • DATE : horodatage d’exécution de la validation

  • JOB : nom du SnowparkJobContext

  • STATUS : état de la validation

  • CHECKPOINT : nom du point de contrôle validé

  • MESSAGE : message d’erreur

  • DATA : données de l’exécution de la validation

  • EXECUTION_MODE : le mode de validation est exécuté