Snowpark Checkpoints-Bibliothek

Snowpark Checkpoints ist eine Testbibliothek zur Validierung von Code, der von Apache PySpark nach Snowpark Python migriert wurde.

Voraussetzungen

Um Snowpark Checkpoints zu verwenden, richten Sie eine Python-Entwicklungsumgebung ein. Folgende Versionen von Python werden unterstützt:

  • 3.9

  • 3.10

  • 3.11

Bemerkung

Python 3.9 hängt von der Snowpark-Client-Version 1.5.0 ab. Python 3.10 hängt von der Snowpark-Client-Version 1.5.1 ab. Python 3.11 hängt von der Snowpark-Client-Version 1.9.0 ab.

Sie können eine virtuelle Python-Umgebung für eine bestimmte Python-Version mit Tools wie Anaconda, Miniconda oder virtualenv erstellen.

Snowpark Checkpoints installieren

Installieren Sie das Snowpark Checkpoints-Paket in eine virtuelle Python-Umgebung, indem Sie conda oder pip verwenden.

  • Verwenden von conda:

    conda install snowpark-checkpoints
    
    Copy
  • Verwenden von pip:

    pip install snowpark-checkpoints
    
    Copy

Wenn Sie möchten, können Sie die Pakete auch einzeln installieren:

  • snowpark-checkpoints-collectors – Verwenden Sie dieses Paket, um Informationen über PySpark DataFrames zu sammeln.

    • Verwenden von conda:

    conda install snowpark-checkpoints-collectors
    
    Copy
    • Verwenden von pip:

    pip install snowpark-checkpoints-collectors
    
    Copy
  • snowpark-checkpoints-hypothesis – Verwenden Sie dieses Paket, um Unit-Tests für Ihren Snowpark-Code zu erstellen, die auf automatisch generierten synthetischen Daten basieren und den DataFrame-Schemas folgen, die aus dem ursprünglichen PySpark Code gesammelt wurden.

    • Verwenden von conda:

      conda install snowpark-checkpoints-hypothesis
      
      Copy
    • Verwenden von pip:

      pip install snowpark-checkpoints-hypothesis
      
      Copy
  • snowpark-checkpoints-validators – Verwenden Sie dieses Paket, um Ihre konvertierten Snowpark-DataFrames anhand der gesammelten Schemas oder der exportierten DataFrames zu validieren, die von der Sammelfunktionalität generiert wurden.

    • Verwenden von conda:

      conda install snowpark-checkpoints-validators
      
      Copy
    • Verwenden von pip:

      pip install snowpark-checkpoints-validators
      
      Copy
  • snowpark-checkpoints-configuration – Verwenden Sie dieses Paket, um snowpark-checkpoints-collectors und snowpark-checkpoints-validators zu ermöglichen, die Konfiguration der Checkpoints automatisch zu laden.

    • Verwenden von conda:

    conda install snowpark-checkpoints-configuration
    
    Copy
    • Verwenden von pip:

    pip install snowpark-checkpoints-configuration
    
    Copy

Verwenden des Framework

Sammeln Sie Informationen über Ihren PySpark-Code

Das Paket snowpark-checkpoints-collectors bietet eine Funktion zum Extrahieren von Informationen aus der PySpark DataFrames. Diese Daten können wir dann mit den konvertierten Snowpark-DataFrames abgleichen, um die Gleichwertigkeit der Verhaltensweise sicherzustellen.

Verwenden Sie die folgende Funktion, um einen neuen Checkpoint-Sammelpunkt einzufügen:

Funktionssignatur:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

Funktionsparameter:

  • df: Die PySpark DataFrame.

  • checkpoint_name: Der Name des Checkpoints. Beginnt mit einem Buchstaben (A-Z, a-z) oder einem Unterstrich (_) und enthält nur Buchstaben, Unterstriche und Dezimalziffern (0–9).

  • sample: (optional) Der Stichprobenumfang. Der Standardwert ist 1.0 (ganz PySpark DataFrame) in einem Bereich von 0 bis 1.0.

  • mode: (optional) Der Ausführungsmodus. Die Optionen sind SCHEMA und DATAFRAME. Der Standardwert ist SCHEMA.

  • output_path: (optional) Der Ausgabepfad zum Speichern des Checkpoints. Der Standardwert ist das aktuelle Arbeitsverzeichnis.

Der Sammlungsprozess erzeugt eine Ausgabedatei mit dem Namen checkpoint_collection_result.json, die Informationen über das Ergebnis für jeden Sammelpunkt enthält. Es ist eine JSON-Datei, die die folgenden Informationen enthält:

  • Ein Zeitstempel, wann der Sammelpunkt gestartet wurde.

  • Der relative Pfad der Datei, in der sich der Sammelpunkt befindet.

  • Die Codezeile der Datei, in der sich der Sammelpunkt befindet.

  • Der Name des Checkpoints des Sammelpunkts.

  • Das Ergebnis der Sammelpunkts (fehlgeschlagen oder bestanden).

Modus für die Schema-Inferenz der gesammelten Daten (Schema)

Dies ist der Standardmodus, der die Schema-Inferenz von Pandera nutzt, um die Metadaten und Prüfungen zu erhalten, die für den angegebenen DataFrame ausgewertet werden. In diesem Modus werden auch benutzerdefinierte Daten aus den Spalten des DataFrame auf der Grundlage des PySpark-Typs erfasst.

Die Spaltendaten und -prüfungen werden auf der Grundlage des PySpark-Typs der Spalte erfasst (siehe die Tabellen unten). Für jede Spalte, unabhängig von ihrem Typ, werden benutzerdefinierte Daten gesammelt, darunter der Name der Spalte, der Typ der Spalte, ob sie Nullwerte zulässt, die Anzahl der Zeilen, die Anzahl der ungültigen Zeilen und die Anzahl der Nullzeilen.

Benutzerdefinierte Daten werden basierend auf dem PySpark-Typ der Spalte erfasst

Spaltentyp

Erfasste benutzerdefinierte Daten

Numerisch (byte, short, integer, long, float und double)

Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle (im Falle des Typs Ganzzahl ist der Wert Null). Die Standardabweichung.

Date

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%d

DayTimeIntervalType und YearMonthIntervalType

Der Mindestwert. Der Höchstwert.

Zeitstempel

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dH:%M:%S

Zeitstempel ntz

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dT%H:%M:%S%z

String

Der Wert für die Mindestlänge. Der Wert für die maximale Länge.

Char

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist char kein gültiger Typ.

Varchar

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist Varchar kein gültiger Typ.

Dezimalzahl

Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle.

Array

Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiges Element. Der Anteil der Nullwerte. Die maximale Array-Größe. Die minimale Array-Größe. Die durchschnittliche Größe von Arrays. Wenn alle Arrays die gleiche Größe haben.

Binär

Die maximale Größe. Die Mindestgröße. Die durchschnittliche Größe. Wenn alle Elemente die gleiche Größe haben.

Zuordnung

Der Typ des Schlüssels. Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiger Wert. Der Anteil der Nullwerte. Die maximale der Zuordnung. Die Mindestgröße der Zuordnung. Die durchschnittliche Größe der Zuordnung. Wenn alle Zuordnungen die gleiche Größe haben.

NULL

NullType steht für „Keine“, da die Daten des Typs nicht ermittelt werden können. Daher ist es nicht möglich, Informationen von diesem Typ zu erhalten.

Struktur

Die Metadaten der Struktur – sie beziehen sich auf jedes structField: name, type, nullable, rows count, rows not null count und rows null count. Es ist ein Array.

Sie definiert außerdem eine Reihe von vordefinierten Validierungsprüfungen für jeden Datentyp, die in der folgenden Tabelle aufgeführt sind:

Die Prüfungen werden auf der Grundlage des Typs der Spalte gesammelt

Typ

Pandera-Prüfungen

Zusätzliche Prüfungen

Boolesch

Jeder Wert ist „True“ oder „False“.

Die Anzahl der True- und False-Werte.

Numerisch (byte, short, integer, long, float und double)

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Die Genauigkeit der Dezimalstelle. Der Mittelwert. Die Standardabweichung.

Date

N/A

Mindest- und Höchstwerte

Zeitstempel

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Das Format des Wertes.

Zeitstempel ntz

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Das Format des Wertes.

String

Jede Wertlänge liegt im Bereich der min. und max. Länge.

Keine

Char

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist char kein gültiger Typ.

Varchar

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist Varchar kein gültiger Typ.

Dezimalzahl

N/A

N/A

Array

N/A

Keine

Binär

N/A

Keine

Zuordnung

N/A

Keine

NULL

N/A

N/A

Struktur

N/A

Keine

Dieser Modus erlaubt es dem Benutzer, eine Stichprobe eines DataFrame zur Erfassung definieren – die ist jedoch optional. Standardmäßig erfolgt die Erfassung über den gesamten DataFrame. Der Umfang der Stichprobe muss die Population statistisch repräsentieren.

Pandera kann nur das Schema eines Pandas-DataFrame ableiten, was bedeutet, dass der PySpark DataFrame in ein Pandas DataFrame konvertiert werden muss, was sich auf die Typauflösungen der Spalten auswirken kann. Insbesondere ermittelt Pandera die folgenden PySpark-Typen als Objekttypen: string, array, map, null, struct und binary.

Die Ausgabe dieses Modus ist eine JSON-Datei für jeden erfassten DataFrame, wobei der Name der Datei derselbe ist wie der Checkpoint. Diese Datei enthält Informationen über das Schema und besteht aus zwei Abschnitten:

  1. Der Abschnitt Pandera-Schema enthält die von Pandera abgeleiteten Daten wie Name, Typ (Pandas), ob die Spalte Nullwerte zulässt oder nicht, sowie weitere Informationen für jede Spalte und Prüfungen der Spalten auf der Grundlage des PySpark-Typs. Es ist ein DataFrameSchema-Objekt von Pandera.

  2. Der Abschnitt mit den benutzerdefinierten Daten ist ein Array mit den benutzerdefinierten Daten, die von jeder Spalte auf der Grundlage des PySpark-Typs erfasst werden.

Bemerkung

Bei der Verarbeitung großer PySpark DataFrames kann es bei der Erfassung zu Speicherproblemen kommen. Um dieses Problem zu beheben, können Sie den sample-Parameter in der Sammelfunktion auf einen Wert zwischen 0,0 und 1,0 setzen, um mit einer Teilmenge der Daten statt mit dem gesamten PySpark DataFrame zu arbeiten.

DataFrame-Modus für erfasste Daten (DataFrame)

Dieser Modus erfasst die Daten des PySpark DataFrame. In diesem Fall speichert der Mechanismus alle Daten des angegebenen DataFrame im Parquet-Format. Unter Verwendung der Standard-Snowflake-Verbindung des Benutzers wird versucht, die Parquet-Dateien in den temporären Stagingbereich von Snowflake hochzuladen und eine Tabelle auf der Grundlage der Informationen im Stagingbereich zu erstellen. Der Name der Datei und der Tabelle ist derselbe wie der des Checkpoints.

Die Ausgabe dieses Modus ist eine Parquet-Datei, die als Ergebnis des DataFrame gespeichert wird, und eine Tabelle mit den DataFrame-Daten in der Standard-Snowflake-Konfigurationsverbindung.

Den in Snowpark konvertierten Code validieren

Das Snowpark-Checkpoints-Paket bietet eine Reihe von Validierungen, die auf den Snowpark-Code angewendet werden können, um sicherzustellen, dass die Verhaltensweise dem PySpark-Code entspricht.

Vom Framework bereitgestellte Funktionen

  • check_with_spark: Ein Decorator, der alle Snowpark-DataFrame-Argumente in eine Funktion oder ein Beispiel und dann in PySpark DataFrames umwandelt. Die Prüfung führt dann eine bereitgestellte Spark-Funktion aus, die die Funktionalität der neuen Snowpark-Funktion widerspiegelt, und vergleicht die Ausgaben der beiden Implementierungen. Unter der Annahme, dass die Spark-Funktion und die Snowpark-Funktionen semantisch identisch sind, können diese Funktionen anhand realer Stichproben überprüft werden.

    Parameter:
    • job_context (SnowparkJobContext): Der Jobkontext mit der Konfiguration und den Details für die Validierung.

    • spark_function (fn): Die entsprechende PySpark-Funktion zum Vergleich mit der Snowpark-Implementierung.

    • checkpoint_name (str): Ein Name für den Checkpoint. Die Standardeinstellung ist „Keine“.

    • sample_number (Optional[int], optional): Die Anzahl der Zeilen für die Validierung. Der Standardwert ist 100.

    • sampling_strategy (Optional[SamplingStrategy], optional): Die für das Sampling verwendete Strategie. Die Standardeinstellung ist SamplingStrategy.RANDOM_SAMPLE.

    • output_path (Optional[str], optional): Der Pfad zum Speichern der Validierungsergebnisse. Die Standardeinstellung ist „Keine“.

    Nachfolgenden sehen Sie ein Beispiel:

     def original_spark_code_I_dont_understand(df):
     from pyspark.sql.functions import col, when
    
     ret = df.withColumn(
         "life_stage",
         when(col("byte") < 4, "child")
         .when(col("byte").between(4, 10), "teenager")
         .otherwise("adult"),
     )
     return ret
    
    
    @check_with_spark(
     job_context=job_context, spark_function=original_spark_code_I_dont_understand
    )
    def new_snowpark_code_I_do_understand(df):
      from snowflake.snowpark.functions import col, lit, when
    
      ref = df.with_column(
          "life_stage",
          when(col("byte") < 4, lit("child"))
          .when(col("byte").between(4, 10), lit("teenager"))
          .otherwise(lit("adult")),
     )
     return ref
    
    
     df1 = new_snowpark_code_I_do_understand(df)
    
    Copy
  • validate_dataframe_checkpoint: Diese Funktion validiert einen Snowpark-Dataframe anhand einer bestimmten Checkpoint-Schemadatei oder eines importierten Dataframes gemäß dem Argumentmodus. Sie stellt sicher, dass die für diesen DataFrame gesammelten Informationen und der DataFrame, der an die Funktion übergeben wird, übereinstimmen.

    Parameter:
    • df (SnowparkDataFrame): Der zu validierende DataFrame.

    • checkpoint_name (str): Der Name des Checkpoints, gegen den validiert werden soll.

    • job_context (SnowparkJobContext, optional) (str): Der Jobkontext für die Validierung. Erforderlich für den PARQUET-Modus.

    • mode (CheckpointMode): Der Validierungsmodus (z. B. SCHEMA, PARQUET). Die Standardeinstellung ist SCHEMA.

    • custom_checks (Optional[dict[Any, Any]], optional): Benutzerdefinierte Prüfungen, die während der Validierung angewendet werden.

    • skip_checks (Optional[dict[Any, Any]], optional): Prüfungen, die bei der Validierung übersprungen werden sollen.

    • sample_frac (Optional[float], optional): Teil des DataFrame, der für die Validierung ausgewählt wird. Der Standardwert ist 0.1.

    • sample_number (Optional[int], optional): Anzahl der Zeilen, die für die Validierung ausgewählt werden.

    • sampling_strategy (Optional[SamplingStrategy], optional): Strategie, die für das Sampling verwendet werden soll.

    • output_path (Optional[str], optional): Der Ausgabepfad für die Validierungsergebnisse.

    Nachfolgenden sehen Sie ein Beispiel:

    # Check a schema/stats here!
    validate_dataframe_checkpoint(
       df1,
    "demo_add_a_column_dataframe",
    job_context=job_context,
    mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema)
    )
    
    Copy

    Je nach gewähltem Modus verwendet die Validierung entweder die gesammelte Schemadatei oder einen mit Parquet geladenen Datenframe in Snowflake, um die Äquivalenz mit der PySpark-Version zu überprüfen.

  • check-output_schema: Dieser Decorator validiert das Schema der Ausgabe einer Snowpark-Funktion und stellt sicher, dass die DataFrame-Ausgabe mit einem bestimmten Pandera-Schema übereinstimmt. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie umschließt die Snowpark-Funktion und führt eine Schemavalidierung des Ausgabe-DataFrame durch, bevor sie das Ergebnis zurückgibt.

    Nachfolgenden sehen Sie ein Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
    from numpy import int8
    
    # Define the Pandera schema
    out_schema = DataFrameSchema(
    {
         "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
         "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
         "COLUMN3": Column(float, Check.less_than(10)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_output_schema(out_schema, "output_schema_checkpoint")
    def preprocessor(dataframe: SnowparkDataFrame):
      return dataframe.with_column(
         "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
         "COLUMN1": [1, 4, 0, 10, 9],
         "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the preprocessor function
    preprocessed_dataframe = preprocessor(sp_dataframe)
    
    Copy
  • check_input_schema: Dieser Decorator prüft das Schema der Eingabeargumente einer Snowpark-Funktion. Dieser Decorator stellt sicher, dass der Eingabe-DataFrame mit einem bestimmten Pandera-Schema übereinstimmt, bevor die Funktion ausgeführt wird. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie schließt die Snowpark-Funktion ein und führt eine Schemavalidierung des Eingabe-DataFrame durch, bevor sie die Funktion ausführt.

    Nachfolgenden sehen Sie ein Beispiel:

    from pandas import DataFrame as PandasDataFrame
    from pandera import DataFrameSchema, Column, Check
    from snowflake.snowpark import Session
    from snowflake.snowpark import DataFrame as SnowparkDataFrame
    from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
    from numpy import int8
    
    # Define the Pandera schema
    input_schema = DataFrameSchema(
    {
         "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
         "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
    }
    )
    
    # Define the Snowpark function and apply the decorator
    @check_input_schema(input_schema, "input_schema_checkpoint")
    def process_dataframe(dataframe: SnowparkDataFrame):
     return dataframe.with_column(
         "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    
    # Create a Snowpark session and DataFrame
    session = Session.builder.getOrCreate()
    df = PandasDataFrame(
    {
         "COLUMN1": [1, 4, 0, 10, 9],
         "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
    )
    sp_dataframe = session.create_dataframe(df)
    
    # Apply the process_dataframe function
    processed_dataframe = process_dataframe(sp_dataframe)
    
    Copy

Statistische Prüfungen

Statistische Prüfungen werden standardmäßig auf den spezifischen Spaltentyp angewendet, wenn die Prüfung im Modus Schema ausgeführt wird; diese Prüfungen können mit skip_checks übersprungen werden.

Spaltentyp

Standardprüfung

Numerisch: byte, short, integer, long, float, und double

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

decimal_precision: Wenn der Wert eine Dezimalzahl ist, wird die Genauigkeit der Dezimalstellen überprüft.

mean: Überprüft, ob der Mittelwert der Spalten innerhalb eines bestimmten Bereichs liegt.

Boolesch

isin: Überprüft, ob der Wert „True“ oder „False“ ist.

True_proportion: Prüft, ob der Anteil der „True“-Werte innerhalb eines bestimmten Bereichs liegt.

False_proportion: Überprüfung, ob der Anteil der „False“-Werte innerhalb eines bestimmten Bereichs liegt.

Datum: date, timestamp und timestamp_ntz

between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert.

Nullwertfähig: Alle unterstützten Typen

Null_proportion: Validiert den Nullanteil entsprechend.

Prüfungen überspringen

Es gibt eine granulare Steuerung für Prüfungen, mit der Sie die Spaltenvalidierung oder bestimmte Prüfungen für eine Spalte überspringen können. Mit dem Parameter skip_checks können Sie die spezifische Spalte und die Art der Validierung angeben, die Sie überspringen möchten. Der Name des Prüfung, der zum Überspringen verwendet wird, ist der Name, der mit der Prüfung verbunden ist.

  • str_contains

  • str_endswith

  • str_length

  • str_matches

  • str_startswith

  • in_range

  • ​​equal_to

  • greater_than_or_equal_to

  • greater_than

  • less_than_or_equal_to

  • less_than

  • not_equal_to

  • notin

  • isin

df = pd.DataFrame(
{
       "COLUMN1": [1, 4, 0, 10, 9],
       "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)

schema = DataFrameSchema(
{
       "COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
       "COLUMN2": Column(
           float,
           [
               Check.greater_than(-20.5),
               Check.less_than(-1.0),
               Check(lambda x: x < -1.2),
           ],
       ),
}
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
   sp_df,
   schema,
   skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Copy

Individuelle Prüfungen

Sie können dem aus der JSON-Datei generierten Schema mit der Eigenschaft custom_checks zusätzliche Prüfungen hinzufügen. Dadurch wird die Prüfung zum Pandera-Schema hinzugefügt:

df = pd.DataFrame(
 {
       "COLUMN1": [1, 4, 0, 10, 9],
       "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
 }
)

session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)

# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
  sp_df,
  "checkpoint-name",
  custom_checks={
       "COLUMN1": [
           Check(lambda x: x.shape[0] == 5),
           Check(lambda x: x.shape[1] == 2),
  ],
  "COLUMN2": [Check(lambda x: x.shape[0] == 5)],
 },
)
Copy

Sampling-Strategien

Der Sampling-Prozess des bereitgestellten Codes wurde entwickelt, um große DataFrames effizient zu validieren, indem eine repräsentative Stichprobe der Daten genommen wird. Diese Vorgehensweis hilft bei der Schema-Validierung, ohne dass der gesamte Datensatz verarbeitet werden muss, was rechenintensiv und zeitaufwendig sein kann.

Parameter:
  • sample_frac: Dieser Parameter gibt den Anteil des DataFrame an, der entnommen werden soll. Wenn zum Beispiel sample_frac auf 0,1 eingestellt ist, werden 10 Prozent der Zeilen aus dem DataFrame als Stichprobe entnommen. Dies ist nützlich, wenn Sie eine Teilmenge der Daten validieren möchten, um Rechenressourcen zu sparen.

  • sample_number: Dieser Parameter gibt die genaue Anzahl der Zeilen an, die aus dem DataFrame als Stichprobe entnommen werden sollen. Wenn zum Beispiel sample_number auf 100 eingestellt ist, werden 100 Zeilen aus dem DataFrame entnommen. Dies ist nützlich, wenn Sie eine feste Anzahl von Zeilen unabhängig von der Größe des DataFrame validieren möchten.

Validierungsergebnis

Nachdem eine beliebige Art von Validierung durchgeführt wurde, wird das Ergebnis, unabhängig davon sie bestanden wurde oder nicht, in checkpoint_validation_results.json gespeichert. Diese Datei wird hauptsächlich für die Funktionalitäten der VSCode-Erweiterung verwendet. Sie enthält Informationen über den Status der Überprüfung, den Zeitstempel, den Checkpoint-Namen, die Nummer der Zeile, in der die Funktion ausgeführt wird, und die Datei.

Sie protokolliert das Ergebnis auch im Standard-Snowflake Konto in einer Tabelle namens SNOWPARK_CHECKPOINTS_REPORT, die Informationen über das Validierungsergebnis enthält.

  • DATE: Zeitstempel der Ausführung der Validierung.

  • JOB: Name der SnowparkJobContext.

  • STATUS: Status der Validierung.

  • CHECKPOINT: Name des überprüften Checkpoints.

  • MESSAGE: Fehlermeldung.

  • DATA: Daten aus der Ausführung der Validierung.

  • EXECUTION_MODE: Validierungsmodus ausgeführt.

Checkpoint-Umgebungsvariable

Das Standardverhalten des Frameworks, um die Datei checkpoints.json zu finden, ist die Suche nach einer Umgebungsvariablen namens SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR. Diese Variable enthält den relativen Pfad des checkpoint.json. Sie wird von der VSCode-Erweiterung zugewiesen, wenn Sie den Checkpoint mit den Code Lenses im Code ausführen. Wenn die Umgebungsvariable nicht zugewiesen ist, versucht das Framework, die Datei im aktuellen Verzeichnis zu suchen.

Hypothesis Unit-Testing

Hypothesis ist eine leistungsstarke Testbibliothek für Python, die traditionelle Unit-Tests durch die automatische Generierung einer Vielzahl von Eingabedaten verbessert. Es verwendet eigenschaftsbasiertes Testen. Anstatt einzelne Testfälle zu spezifizieren, können Sie die erwartete Verhaltensweise Ihres Codes mit Eigenschaften oder Bedingungen beschreiben und Hypothesis erzeugt Beispiele, um diese Eigenschaften gründlich zu testen. Dieser Ansatz hilft, Grenzfälle und unerwartete Verhaltensweisen aufzudecken, und ist daher besonders effektiv bei komplexen Funktionen. Weitere Informationen finden Sie unter Hypothese.

Das snowpark-checkpoints-hypothesis-Paket erweitert die Hypothese-Bibliothek, um synthetische Snowpark-DataFrames für Testzwecke zu erzeugen. Indem Sie die Möglichkeit von Hypothesis nutzen, vielfältige und zufällige Testdaten zu generieren, können Sie Snowpark-DataFrames mit unterschiedlichen Schemas und Werten erstellen, um reale Szenarios zu simulieren und Grenzfälle aufzudecken, um robusten Code zu gewährleisten und die Korrektheit komplexer Transformationen zu überprüfen.

Die Hypothesis-Strategie für Snowpark basiert auf Pandera zur Erzeugung synthetischer Daten. Die Funktion dataframe_strategy verwendet das angegebene Schema, um einen Pandas-DataFrame zu generieren, der dem Schema entspricht, und konvertiert ihn dann in einen Snowpark-DataFrame.

Funktionssignatur:

def dataframe_strategy(
  schema: Union[str, DataFrameSchema],
  session: Session,
  size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Copy

Funktionsparameter:

  • schema: Das Schema, das die Spalten, Datentypen und Prüfungen definiert, denen der generierte Snowpark-Datenframe entsprechen soll. Das Schema kann Folgendes sein:

  • session: Eine Instanz von snowflake.snowpark.Session, die für die Erstellung des Snowpark-DataFrames verwendet wird.

  • size: Die Anzahl der zu erzeugenden Zeilen für jeden Snowpark DataFrame. Wenn dieser Parameter nicht angegeben wird, generiert die Strategie DataFrames in unterschiedlichen Größen.

Funktionsausgabe:

Gibt eine Hypothesis-SearchStrategy zurück, die Snowpark DataFrames erzeugt.

Unterstützte und nicht unterstützte Datentypen

Die Funktion dataframe_strategy unterstützt die Erstellung von Snowpark-DataFrames mit verschiedenen Datentypen. Je nach dem Typ des an die Funktion übergebenen Schema-Arguments variieren die von der Strategie unterstützten Datentypen. Beachten Sie, dass die Strategie eine Ausnahme auslöst, wenn sie einen nicht unterstützten Datentyp findet.

Die folgende Tabelle zeigt die von der Funktion dataframe_strategy unterstützten und nicht unterstützten PySpark Datentypen, wenn eine JSON-Datei als schema-Argument übergeben wird.

Datentyp PySpark

Unterstützt

-Array

Ja

Boolesch

Ja

Char

Nein

Datum

Ja

DayTimeIntervalType

Nein

Dezimalzahl

Nein

Zuordnung

Nein

Null

Nein

Byte, Kurz, Ganzzahl, Lang, Gleitkommazahl, Doppelt

Ja

Zeichenfolge

Ja

Struktur

Nein

Zeitstempel

Ja

TimestampNTZ

Ja

Varchar

Nein

YearMonthIntervalType

Nein

Die folgende Tabelle zeigt die Pandera-Datentypen, die von der Funktion dataframe_strategy unterstützt werden, wenn ein DataFrameSchema Objekt als schema-Argument übergeben wird, sowie die Snowpark-Datentypen, denen sie zugeordnet sind.

Pandera-Datentyp

Snowpark-Datentyp

int8

ByteType

int16

ShortType

int32

IntegerType

int64

LongType

float32

FloatType

float64

DoubleType

string

StringType

bool

BooleanType

datetime64[ns, tz]

TimestampType(TZ)

datetime64[ns]

TimestampType(NTZ)

date

DateType

Beispiele

Der typische Workflow für die Verwendung der Hypothesis-Bibliothek zur Erstellung eines Snowpark-DataFrames ist wie folgt:

  1. Erstellen Sie eine Standard-Python-Testfunktion mit den verschiedenen Assertionen oder Bedingungen, die Ihr Code für alle Eingaben erfüllen sollte.

  2. Fügen Sie den Hypothesis @given-Decorator zu Ihrer Testfunktion hinzu und übergeben Sie die Funktion dataframe_strategy als Argument. Weitere Informationen über den Decorator @given finden Sie unter hypothesis.given.

  3. Führen Sie die Testfunktion aus. Wenn der Test ausgeführt wird, stellt Hypothesis automatisch die generierten Eingaben als Argumente für den Test bereit.

Beispiel 1: Generieren von Snowpark-DataFrames aus einer JSON-Datei

Im Folgenden finden Sie ein Beispiel für die Generierung von Snowpark DataFrames aus einer JSON Schemadatei, die mit der Funktion collect_dataframe_checkpoint des snowpark-checkpoints-collectors-Pakets erstellt wurde.

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_json_file(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

Beispiel 2: Generieren eines Snowparks-DataFrame aus einem Pandera-DataFrameSchema-Objekt

Nachfolgend finden Sie ein Beispiel für die Generierung von Snowpark-DataFrames aus einer Instanz eines Pandera-DataFrameSchema. Weitere Informationen finden Sie unter Pandera-DataFrameSchema.

import pandera as pa

from hypothesis import given

from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session


@given(
    df=dataframe_strategy(
        schema=pa.DataFrameSchema(
            {
                "boolean_column": pa.Column(bool),
                "integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
                "float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
            }
        ),
        session=Session.builder.getOrCreate(),
        size=10,
    )
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

Beispiel 3: Anpassen der Hypothesis-Verhaltensweise

Sie können die Verhaltensweise Ihres Tests auch mit dem Hypothesis-Decorator @settings anpassen. Mit diesem Decorator können Sie verschiedene Konfigurationsparameter anpassen, um das Testverhalten auf Ihre Bedürfnisse zuzuschneiden. Mit dem @settings-Decorator können Sie Aspekte wie die maximale Anzahl von Testfällen, die Frist für jede Testausführung, die Ausführlichkeitsstufen und viele andere steuern. Weitere Informationen finden Sie unter Hypothesis-Einstellungen.

from datetime import timedelta

from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session

from snowflake.hypothesis_snowpark import dataframe_strategy


@given(
    df=dataframe_strategy(
        schema="path/to/file.json",
        session=Session.builder.getOrCreate(),
    )
)
@settings(
    deadline=timedelta(milliseconds=800),
    max_examples=25,
)
def test_my_function(df: DataFrame):
    # Test a particular function using the generated Snowpark DataFrame
    ...
Copy

Einrichten einer IDE für Snowpark Checkpoints

Der Snowflake Extension for Visual Studio-Code bietet Unterstützung für die Snowpark Checkpoints Bibliothek, um die Nutzung des Frameworks zu verbessern. Es gibt Ihnen eine abgestufte Kontrolle über die collect- und validate-Anweisungen, die in Ihren Code eingefügt werden, und überprüft den Status der Verhaltensäquivalenz-Assertionen Ihres konvertierten Codes.

Snowpark Checkpoints aktivieren

Um Snowpark Checkpoints zu aktivieren, gehen Sie zu den Erweiterungseinstellungen von Snowflake und aktivieren Sie Snowpark Checkpoints: Enabled.

Aktivierte Checkpoints

Ansicht

Wenn Sie die Eigenschaft Snowpark Checkpoints auf Enabled setzen, öffnet sich in der Erweiterung eine neue Registerkarte namens SNOWPARK CHECKPOINTS. Es zeigt alle Checkpoints im Arbeitsbereich an und ermöglicht mehrere Aktionen, wie z. B. das Aktivieren/Deaktivieren aller oder einzelner Checkpoints, das Löschen aller Checkpoints aus Dateien und das Navigieren zu der Datei und der Codezeile, in der der Prüfpunkt definiert ist, indem Sie auf den jeweiligen Checkpoint doppelklicken.

Alle Checkpoints umschalten

Diese Option befindet sich in der rechten oberen Ecke der Registerkarte Snowpark Checkpoints und schaltet die aktivierte Eigenschaft in allen Checkpoints um.

Checkpoints umschalten

Aktivierte Checkpoints:

Checkpoints umschalten

Die Deaktivierung eines Checkpoints führt dazu, dass er zur Laufzeit übersprungen wird.

Checkpoints deaktivieren

Alle Checkpoints bereinigen

Befindet sich in der oberen rechten Ecke der Registerkarte Snowpark Checkpoints. Dadurch werden Checkpoints aus allen Python-Dateien, einschließlich Jupyter-Notebooks, in Ihrem Arbeitsbereich entfernt, aber nicht aus dem Vertrag und dem Panel gelöscht. Das heißt, sie können mit dem Befehl Snowflake: Restore All Checkpoints wiederhergestellt werden.

Checkpoints entfernen

Einfügen von Checkpoints in eine Datei

Wenn Sie mit der rechten Maustaste in eine Datei klicken, wird ein Kontextmenü mit der Option Snowpark Checkpoints angezeigt, mit der Sie Collection- und Validation-Checkpoints hinzufügen können.

Option Snowpark-Checkpoints im Kontextmenü:

Checkpoints hinzufügen

Collector/Validator hinzugefügt:

Checkpoints für Collector und Validator

Einen einzelnen Checkpoint ausführen

Sie können einen einzelnen Checkpoint ausführen, indem Sie auf die Code-Lense-Option klicken, die über jedem Checkpoint angezeigt wird. Wenn Sie es ausführen, wird eine Ausgabekonsole angezeigt, die den Fortschritt anzeigt. Sobald es beendet ist, wird die Ergebnisansicht aufgerufen. Sogar wenn der Checkpoint in der Vertragsdatei deaktiviert ist, wird er nur für seine Ausführung aktiviert.

Einen einzelnen Checkpoint ausführen

Wenn ein Einstiegspunkt nicht in der Vertragsdatei deklariert ist, wird die Fehlermeldung: Einstiegspunkt für den Checkpoint nicht gefunden. angezeigt.

Einstiegspunkt nicht gefunden

Ausführen aller aktivierten Snowpark Checkpoints in einer Datei

In der oberen rechten Ecke jeder Datei finden Sie die Schaltfläche Run all checkpoints from the current file.

Alle Checkpoints ausführen

Wenn Sie darauf klicken, erscheint ein Ausgabekanal, der den Fortschritt der Ausführung anzeigt.

Fortschritt der Checkpoints

Zeitleistenansicht

Zeigt eine Zeitleiste mit den Ergebnissen der Checkpoint-Ausführung an.

Zeitleistenansicht

Befehle

Die folgenden Befehle sind für Snowpark Checkpoints verfügbar. Um sie zu verwenden, geben Sie Snowflake: [command name] in die Befehlspalette ein.

Snowpark Checkpoints-Befehle

Befehl

Beschreibung

Snowflake: Checkpoint umschalten

Schaltet die Eigenschaft „aktiviert“ für alle Checkpoints um.

Snowflake: Snowpark Checkpoints – Projektinitialisierung

Triggert die Projektinitialisierung und legt eine Vertragsdatei an, wenn sie nicht vorhanden ist. Falls er existiert, wird ein Popup-Fenster angezeigt, in dem Sie gefragt werden, ob Sie den Checkpoint in die Vertragsdatei laden möchten.

Snowflake: Alle Checkpoints löschen

Löscht alle Checkpoints aus allen Dateien im Arbeitsbereich.

Snowflake: Alle Checkpoints wiederherstellen

Stellen Sie Checkpoints wieder her, die zuvor aus Dateien gelöscht wurden und noch in der Vertragsdatei vorhanden sind.

Snowflake: Validierungs-/Erfassungs-Checkpoint hinzufügen

Fügt einen Validator oder Collector mit seinen obligatorischen Parametern an der Cursorposition ein.

Snowflake: Fokus auf Snowpark Checkpoints-Ansicht

Verschiebt den Fokus auf den Bereich SNOWPARK CHECKPOINTS.

Snowflake: Zeitleiste der offenen Checkpoints

Zeigt eine Zeitleiste der Checkpoint-Ausführungen an.

Snowflake: Alle Checkpoints aus der aktuellen Datei ausführen

Führt alle aktivierten Checkpoints in der aktuellen Datei aus.

Snowflake: Alle Checkpoints im Arbeitsbereich ausführen

Führt alle aktivierten Checkpoints aus dem Arbeitsbereich aus.

Snowflake: Alle Snowpark Checkpoint-Ergebnisse anzeigen

Zeigt eine Registerkarte mit den Ergebnissen aller Checkpoints an.

Warnungen

  • Duplikat: Wenn in einem Sammlungsprojekt zwei Checkpoints mit demselben Namen zugewiesen werden, erscheint eine Warnung: „Ein weiterer Checkpoint mit identischem Namen wurde entdeckt und wird überschrieben.“ Validierungsprojekte können mehrere Checkpoints mit demselben Namen haben, es wird keine Warnung angezeigt.

  • Falscher Typ: Wenn Sie einen Checkpoint mit einem anderen Typ als dem Projekttyp hinzufügen, wird er mit der folgenden Fehlermeldung unterstrichen: „Stellen Sie sicher, dass Sie die richtige Snowpark-Checkpoints-Anweisung verwenden. Diese spezielle Checkpoint-Anweisung unterscheidet sich von den anderen in diesem Projekt verwendeten Anweisungen. Anweisungen, die nicht dem Projekttyp entsprechen, werden bei der Ausführung ignoriert.

  • Ungültiger Checkpoint-Name: Es gibt ungültige Möglichkeiten, einen Parameter für den Checkpoint-Namen hinzuzufügen. Wenn dies geschieht, wird eine Warnmeldung angezeigt: „Ungültiger Checkpoint-Name.“ Checkpoint-Namen müssen mit einem Buchstaben beginnen und dürfen nur Buchstaben, Zahlen, Bindestriche und Unterstriche enthalten.