Snowpark Checkpoints-Bibliothek: Prüfer

Das Snowpark-Checkpoints-Paket bietet eine Reihe von Validierungen, die auf den Snowpark-Code angewendet werden können, um sicherzustellen, dass die Verhaltensweise dem PySpark-Code entspricht.

Vom Framework bereitgestellte Funktionen

  • check_with_spark: Ein Decorator, der alle Snowpark-DataFrame-Argumente in eine Funktion oder ein Beispiel und dann in PySpark-DataFrames umwandelt. Die Prüfung führt dann eine bereitgestellte Spark-Funktion aus, die die Funktionalität der neuen Snowpark-Funktion widerspiegelt, und vergleicht die Ausgaben der beiden Implementierungen. Unter der Annahme, dass die Spark-Funktion und die Snowpark-Funktionen semantisch identisch sind, kann der Decorator diese Funktionen anhand realer Stichproben überprüft werden.

    Parameter:
    • job_context (SnowparkJobContext): Der Jobkontext mit der Konfiguration und den Details für die Validierung

    • spark_function (fn): Die entsprechende PySpark-Funktion zum Vergleich mit der Snowpark-Implementierung

    • checkpoint_name (str): Ein Name für den Checkpoint; ist standardmäßig „None“

    • sample_number (Optional[int], optional): Anzahl der Zeilen für die Validierung; der Standardwert ist 100

    • sampling_strategy (Optional[SamplingStrategy], optional): Die für das Sampling verwendete Strategie; der Standardwert ist SamplingStrategy.RANDOM_SAMPLE

    • output_path (Optional[str], optional): Der Pfad zu der Datei, in der die Validierungsergebnisse gespeichert werden; standardmäßig „None“

    Beispiel:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: Diese Funktion validiert einen Snowpark-Dataframe anhand einer bestimmten Checkpoint-Schemadatei oder eines importierten Dataframes gemäß dem Argumentmodus. Sie stellt sicher, dass die für diesen DataFrame gesammelten Informationen und der DataFrame, der an die Funktion übergeben wird, übereinstimmen.

    Parameter:
    • df (SnowparkDataFrame): Der zu validierende DataFrame

    • checkpoint_name (str): Der Name des Checkpoints, anhand dessen validiert werden soll.

    • job_context (SnowparkJobContext, optional) (str): Der Jobkontext für die Validierung; erforderlich für PARQUET -Modus

    • mode (CheckpointMode): Der Modus der Validierung (z. B. SCHEMA, PARQUET). Standardwert: SCHEMA

    • custom_checks (Optional[dict[Any, Any]], optional): Benutzerdefinierte Prüfungen, die während der Validierung angewendet werden

    • skip_checks (Optional[dict[Any, Any]], optional): Prüfungen, die bei der Validierung übersprungen werden sollen

    • sample_frac (Optional[float], optional): Bruchteil von DataFrame als Stichprobe für die Validierung; der Standardwert ist 0,1

    • sample_number (Optional[int], optional): Anzahl der Zeilen als Stichprobe für die Validierung

    • sampling_strategy (Optional[SamplingStrategy], optional): Strategie für das Sampling

    • output_path (Optional[str], optional): Der Pfad zu der Datei, in der die Validierungsergebnisse gespeichert werden

    Beispiel:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
        df1,
        "demo_add_a_column_dataframe",
        job_context=job_context,
        mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    Je nach gewähltem Modus verwendet die Validierung entweder die gesammelte Schemadatei oder einen mit Parquet geladenen Datenframe in Snowflake, um die Äquivalenz mit der PySpark-Version zu überprüfen.

  • check-output_schema: Dieser Decorator validiert das Schema der Ausgabe einer Snowpark-Funktion und stellt sicher, dass die DataFrame-Ausgabe mit einem bestimmten Pandera-Schema übereinstimmt. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie umschließt die Snowpark-Funktion und führt eine Schemavalidierung des Ausgabe-DataFrame durch, bevor sie das Ergebnis zurückgibt.

    Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: Dieser Decorator prüft das Schema der Eingabeargumente einer Snowpark-Funktion. Dieser Decorator stellt sicher, dass der Eingabe-DataFrame mit einem bestimmten Pandera-Schema übereinstimmt, bevor die Funktion ausgeführt wird. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie schließt die Snowpark-Funktion ein und führt eine Schemavalidierung des Eingabe-DataFrame durch, bevor sie die Funktion ausführt.

    Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Statistische Prüfungen

Statistische Prüfungen werden standardmäßig auf den spezifischen Spaltentyp angewendet, wenn die Prüfung im Modus Schema ausgeführt wird; diese Prüfungen können mit skip_checks übersprungen werden.

Spaltentyp

Standardprüfung

Numerisch: byte, short, integer, long, float, und double

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

decimal_precision: Wenn der Wert eine Dezimalzahl ist, wird die Genauigkeit der Dezimalstellen überprüft.

mean: Überprüft, ob der Mittelwert der Spalten innerhalb eines bestimmten Bereichs liegt.

Boolesch

isin: Überprüfen Sie, ob der Wert „True“ oder „False“ ist.

True_proportion: Überprüfen Sie, ob der Anteil der „True“-Werte innerhalb eines bestimmten Bereichs liegt.

False_proportion: Überprüfen Sie, ob der Anteil der „False“-Werte innerhalb eines bestimmten Bereichs liegt.

Datum: date, timestamp und timestamp_ntz

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

Nullwertfähig: Alle unterstützten Typen

Null_proportion: Validieren Sie den Nullanteil entsprechend.

Prüfungen überspringen

Mit dieser detaillierten Steuerung von Prüfungen können Sie die Spaltenvalidierung oder bestimmte Prüfungen für eine Spalte überspringen. Mit dem Parameter skip_checks können Sie die spezifische Spalte und die Art der Validierung angeben, die Sie überspringen möchten. Der Name des Prüfung, der zum Überspringen verwendet wird, ist der Name, der mit der Prüfung verbunden ist.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

Beispiel:

df = pd.DataFrame(
{
      "COLUMN1": [1, 4, 0, 10, 9],
      "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
      "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
      "COLUMN2": Column(
          float,
          [
              Check.greater_than(-20.5),
              Check.less_than(-1.0),
              Check(lambda x: x < -1.2),
          ],
      ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
  sp_df,
  schema,
  skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Individuelle Prüfungen

Sie können dem aus der JSON-Datei generierten Schema mit der Eigenschaft custom_checks zusätzliche Prüfungen hinzufügen. Damit wird die Prüfung zum Pandera-Schema hinzugefügt.

Beispiel:

df = pd.DataFrame(
  {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
  }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
        "COLUMN1": [
            Check(lambda x: x.shape[0] == 5),
            Check(lambda x: x.shape[1] == 2),
    ],
    "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
  },
)
Copy

Sampling-Strategien

Der Sampling-Prozess des bereitgestellten Codes wurde entwickelt, um große DataFrames effizient zu validieren, indem eine repräsentative Stichprobe der Daten genommen wird. Diese Vorgehensweis hilft bei der Schema-Validierung, ohne dass der gesamte Datensatz verarbeitet werden muss, was rechenintensiv und zeitaufwendig sein kann.

Parameter:
  • sample_frac: Dieser Parameter gibt den Anteil des DataFrame an, der entnommen werden soll. Wenn zum Beispiel sample_frac auf 0,1 eingestellt ist, werden 10 Prozent der Zeilen aus dem DataFrame als Stichprobe entnommen. Dies ist nützlich, wenn Sie eine Teilmenge der Daten validieren möchten, um Rechenressourcen zu sparen.

  • sample_number: Dieser Parameter gibt die genaue Anzahl der Zeilen an, die aus dem DataFrame als Stichprobe entnommen werden sollen. Wenn zum Beispiel sample_number auf 100 eingestellt ist, werden 100 Zeilen aus dem DataFrame entnommen. Dies ist nützlich, wenn Sie eine feste Anzahl von Zeilen unabhängig von der Größe des DataFrame validieren möchten.

Validierungsergebnis

Nachdem eine beliebige Art von Validierung durchgeführt wurde, wird das Ergebnis, unabhängig davon sie bestanden wurde oder nicht, in checkpoint_validation_results.json gespeichert. Diese Datei wird hauptsächlich für die Funktionalitäten der VSCode-Erweiterung verwendet. Sie enthält Informationen über den Status der Überprüfung, den Zeitstempel, den Checkpoint-Namen, die Nummer der Zeile, in der die Funktion ausgeführt wird, und die Datei.

Sie protokolliert das Ergebnis auch im Standard-Snowflake Konto in der Tabelle SNOWPARK_CHECKPOINTS_REPORT, die Informationen über das Validierungsergebnis enthält.

  • DATE: Zeitstempel der Ausführung der Validierung

  • JOB: Name der SnowparkJobContext

  • STATUS: Status der Validierung

  • CHECKPOINT: Name des überprüften Checkpoints

  • MESSAGE: Fehlermeldung

  • DATA: Daten aus der Ausführung der Validierung

  • EXECUTION_MODE: Validierungsmodus ausgeführt