Snowpark Checkpoints-Bibliothek: Prüfer

Den in Snowpark konvertierten Code validieren

Das Snowpark-Checkpoints-Paket bietet eine Reihe von Validierungen, die auf den Snowpark-Code angewendet werden können, um sicherzustellen, dass die Verhaltensweise dem PySpark-Code entspricht.

Vom Framework bereitgestellte Funktionen

  • check_with_spark: Ein Decorator, der alle Snowpark-DataFrame-Argumente in eine Funktion oder ein Beispiel und dann in PySpark DataFrames umwandelt. Die Prüfung führt dann eine bereitgestellte Spark-Funktion aus, die die Funktionalität der neuen Snowpark-Funktion widerspiegelt, und vergleicht die Ausgaben zwischen den beiden Implementierungen. Unter der Annahme, dass die Spark-Funktion und die Snowpark-Funktionen semantisch identisch sind, werden diese Funktionen auf realen, abgetasteten Daten überprüft.

    Parameter:
    • job_context (SnowparkJobContext): Der Jobkontext mit der Konfiguration und den Details für die Validierung.

    • spark_function (fn): Die entsprechende PySpark-Funktion zum Vergleich mit der Snowpark-Implementierung.

    • checkpoint_name (str): Ein Name für den Checkpoint. Die Standardeinstellung ist „Keine“.

    • sample_number (Optional[int], optional): Die Anzahl der Zeilen für die Validierung. Der Standardwert ist 100.

    • sampling_strategy (Optional[SamplingStrategy], optional): Die für das Sampling verwendete Strategie. Die Standardeinstellung ist SamplingStrategy.RANDOM_SAMPLE.

    • output_path (Optional[str], optional): Der Pfad zum Speichern der Validierungsergebnisse. Die Standardeinstellung ist „Keine“.

    Nachfolgenden sehen Sie ein Beispiel:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: Diese Funktion validiert einen Snowpark-Dataframe anhand einer bestimmten Checkpoint-Schemadatei oder eines importierten Dataframes gemäß dem Argumentmodus. Sie stellt sicher, dass die für diesen DataFrame gesammelten Informationen und der DataFrame, der an die Funktion übergeben wird, übereinstimmen.

    Parameter:
    • df (SnowparkDataFrame): Der zu validierende DataFrame.

    • checkpoint_name (str): Der Name des Checkpoints, gegen den validiert werden soll.

    • job_context (SnowparkJobContext, optional) (str): Der Jobkontext für die Validierung. Erforderlich für den PARQUET-Modus.

    • mode (CheckpointMode): Der Validierungsmodus (z. B. SCHEMA, PARQUET). Die Standardeinstellung ist SCHEMA.

    • custom_checks (Optional[dict[Any, Any]], optional): Benutzerdefinierte Prüfungen, die während der Validierung angewendet werden.

    • skip_checks (Optional[dict[Any, Any]], optional): Prüfungen, die bei der Validierung übersprungen werden sollen.

    • sample_frac (Optional[float], optional): Teil des DataFrame, der für die Validierung ausgewählt wird. Der Standardwert ist 0.1.

    • sample_number (Optional[int], optional): Anzahl der Zeilen, die für die Validierung ausgewählt werden.

    • sampling_strategy (Optional[SamplingStrategy], optional): Strategie, die für das Sampling verwendet werden soll.

    • output_path (Optional[str], optional): Der Ausgabepfad für die Validierungsergebnisse.

    Nachfolgenden sehen Sie ein Beispiel:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    Je nach gewähltem Modus verwendet die Validierung entweder die gesammelte Schemadatei oder einen mit Parquet geladenen Datenframe in Snowflake, um die Äquivalenz mit der PySpark-Version zu überprüfen.

  • check-output_schema: Dieser Decorator validiert das Schema der Ausgabe einer Snowpark-Funktion und stellt sicher, dass die Ausgabe DataFrame mit einem bestimmten Pandera-Schema übereinstimmt. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie umschließt die Snowpark-Funktion und führt eine Schemavalidierung des Ausgabe-DataFrame durch, bevor sie das Ergebnis zurückgibt.

    Nachfolgenden sehen Sie ein Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: Dieser Decorator prüft das Schema der Eingabeargumente einer Snowpark-Funktion. Dieser Decorator stellt sicher, dass der Eingabe-DataFrame mit einem bestimmten Pandera-Schema übereinstimmt, bevor die Funktion ausgeführt wird. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie schließt die Snowpark-Funktion ein und führt eine Schemavalidierung des Eingabe-DataFrame durch, bevor sie die Funktion ausführt.

    Nachfolgenden sehen Sie ein Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Statistische Prüfungen

Statistische Prüfungen werden standardmäßig auf den spezifischen Spaltentyp angewendet, wenn die Prüfung im Modus Schema ausgeführt wird; diese Prüfungen können mit skip_checks übersprungen werden.

Spaltentyp

Standardprüfung

Numerisch: byte, short, integer, long, float, und double

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

decimal_precision: Wenn der Wert eine Dezimalzahl ist, wird die Genauigkeit der Dezimalstellen überprüft.

mean: Überprüft, ob der Mittelwert der Spalten innerhalb eines bestimmten Bereichs liegt.

Boolesch

isin: Überprüft, ob der Wert „True“ oder „False“ ist.

True_proportion: Prüft, ob der Anteil der „True“-Werte innerhalb eines bestimmten Bereichs liegt.

False_proportion: Überprüfung, ob der Anteil der „False“-Werte innerhalb eines bestimmten Bereichs liegt.

Datum: date, timestamp und timestamp_ntz

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

Nullwertfähig: Alle unterstützten Typen

Null_proportion: Validiert den Nullanteil entsprechend.

Prüfungen überspringen

Es gibt eine granulare Steuerung für Prüfungen, mit der Sie die Spaltenvalidierung oder bestimmte Prüfungen für eine Spalte überspringen können. Mit dem Parameter skip_checks können Sie die spezifische Spalte und die Art der Validierung angeben, die Sie überspringen möchten. Der Name des Prüfung, der zum Überspringen verwendet wird, ist der Name, der mit der Prüfung verbunden ist.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Individuelle Prüfungen

Sie können dem aus der JSON-Datei generierten Schema mit der Eigenschaft custom_checks zusätzliche Prüfungen hinzufügen. Dadurch wird die Prüfung zum Pandera-Schema hinzugefügt:

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

Sampling-Strategien

Der Sampling-Prozess des bereitgestellten Codes wurde entwickelt, um große DataFrames effizient zu validieren, indem eine repräsentative Stichprobe der Daten genommen wird. Diese Vorgehensweis hilft bei der Schema-Validierung, ohne dass der gesamte Datensatz verarbeitet werden muss, was rechenintensiv und zeitaufwendig sein kann.

Parameter:
  • sample_frac: Dieser Parameter gibt den Anteil des DataFrame an, der entnommen werden soll. Wenn zum Beispiel sample_frac auf 0,1 eingestellt ist, werden 10 Prozent der Zeilen aus dem DataFrame als Stichprobe entnommen. Dies ist nützlich, wenn Sie eine Teilmenge der Daten validieren möchten, um Rechenressourcen zu sparen.

  • sample_number: Dieser Parameter gibt die genaue Anzahl der Zeilen an, die aus dem DataFrame als Stichprobe entnommen werden sollen. Wenn zum Beispiel sample_number auf 100 eingestellt ist, werden 100 Zeilen aus dem DataFrame entnommen. Dies ist nützlich, wenn Sie eine feste Anzahl von Zeilen unabhängig von der Größe des DataFrame validieren möchten.

Validierungsergebnis

Nachdem eine beliebige Art von Validierung durchgeführt wurde, wird das Ergebnis, unabhängig davon sie bestanden wurde oder nicht, in checkpoint_validation_results.json gespeichert. Diese Datei wird hauptsächlich für die Funktionalitäten der VSCode-Erweiterung verwendet. Sie enthält Informationen über den Status der Überprüfung, den Zeitstempel, den Checkpoint-Namen, die Nummer der Zeile, in der die Funktion ausgeführt wird, und die Datei.

Sie protokolliert das Ergebnis auch im Standard-Snowflake Konto in einer Tabelle namens SNOWPARK_CHECKPOINTS_REPORT, die Informationen über das Validierungsergebnis enthält.

  • DATE: Zeitstempel der Ausführung der Validierung.

  • JOB: Name der SnowparkJobContext.

  • STATUS: Status der Validierung.

  • CHECKPOINT: Name des überprüften Checkpoints.

  • MESSAGE: Fehlermeldung.

  • DATA: Daten aus der Ausführung der Validierung.

  • EXECUTION_MODE: Validierungsmodus ausgeführt.