Snowpark Checkpoints-Bibliothek¶
Snowpark Checkpoints ist eine Testbibliothek zur Validierung von Code, der von Apache PySpark nach Snowpark Python migriert wurde.
Voraussetzungen¶
Um Snowpark Checkpoints zu verwenden, richten Sie eine Python-Entwicklungsumgebung ein. Folgende Versionen von Python werden unterstützt:
3.9
3.10
3.11
Bemerkung
Python 3.9 hängt von der Snowpark-Client-Version 1.5.0 ab. Python 3.10 hängt von der Snowpark-Client-Version 1.5.1 ab. Python 3.11 hängt von der Snowpark-Client-Version 1.9.0 ab.
Sie können eine virtuelle Python-Umgebung für eine bestimmte Python-Version mit Tools wie Anaconda, Miniconda oder virtualenv erstellen.
Snowpark Checkpoints installieren¶
Installieren Sie das Snowpark Checkpoints-Paket in eine virtuelle Python-Umgebung, indem Sie conda oder pip verwenden.
Verwenden von conda:
conda install snowpark-checkpoints
Verwenden von pip:
pip install snowpark-checkpoints
Wenn Sie möchten, können Sie die Pakete auch einzeln installieren:
snowpark-checkpoints-collectors – Verwenden Sie dieses Paket, um Informationen über PySpark DataFrames zu sammeln.
Verwenden von conda:
conda install snowpark-checkpoints-collectors
Verwenden von pip:
pip install snowpark-checkpoints-collectors
snowpark-checkpoints-hypothesis – Verwenden Sie dieses Paket, um Unit-Tests für Ihren Snowpark-Code zu erstellen, die auf automatisch generierten synthetischen Daten basieren und den DataFrame-Schemas folgen, die aus dem ursprünglichen PySpark Code gesammelt wurden.
Verwenden von conda:
conda install snowpark-checkpoints-hypothesis
Verwenden von pip:
pip install snowpark-checkpoints-hypothesis
snowpark-checkpoints-validators – Verwenden Sie dieses Paket, um Ihre konvertierten Snowpark-DataFrames anhand der gesammelten Schemas oder der exportierten DataFrames zu validieren, die von der Sammelfunktionalität generiert wurden.
Verwenden von conda:
conda install snowpark-checkpoints-validators
Verwenden von pip:
pip install snowpark-checkpoints-validators
snowpark-checkpoints-configuration – Verwenden Sie dieses Paket, um
snowpark-checkpoints-collectors
undsnowpark-checkpoints-validators
zu ermöglichen, die Konfiguration der Checkpoints automatisch zu laden.Verwenden von conda:
conda install snowpark-checkpoints-configuration
Verwenden von pip:
pip install snowpark-checkpoints-configuration
Verwenden des Framework¶
Sammeln Sie Informationen über Ihren PySpark-Code¶
Das Paket snowpark-checkpoints-collectors
bietet eine Funktion zum Extrahieren von Informationen aus der PySpark DataFrames. Diese Daten können wir dann mit den konvertierten Snowpark-DataFrames abgleichen, um die Gleichwertigkeit der Verhaltensweise sicherzustellen.
Verwenden Sie die folgende Funktion, um einen neuen Checkpoint-Sammelpunkt einzufügen:
Funktionssignatur:
def collect_dataframe_checkpoint(df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float],
mode: Optional[CheckpointMode],
output_path: Optional[str]) -> None:
Funktionsparameter:
df: Die PySpark DataFrame.
checkpoint_name: Der Name des Checkpoints. Beginnt mit einem Buchstaben (A-Z, a-z) oder einem Unterstrich (_) und enthält nur Buchstaben, Unterstriche und Dezimalziffern (0–9).
sample: (optional) Der Stichprobenumfang. Der Standardwert ist 1.0 (ganz PySpark DataFrame) in einem Bereich von 0 bis 1.0.
mode: (optional) Der Ausführungsmodus. Die Optionen sind
SCHEMA
undDATAFRAME
. Der Standardwert istSCHEMA
.output_path: (optional) Der Ausgabepfad zum Speichern des Checkpoints. Der Standardwert ist das aktuelle Arbeitsverzeichnis.
Der Sammlungsprozess erzeugt eine Ausgabedatei mit dem Namen checkpoint_collection_result.json
, die Informationen über das Ergebnis für jeden Sammelpunkt enthält. Es ist eine JSON-Datei, die die folgenden Informationen enthält:
Ein Zeitstempel, wann der Sammelpunkt gestartet wurde.
Der relative Pfad der Datei, in der sich der Sammelpunkt befindet.
Die Codezeile der Datei, in der sich der Sammelpunkt befindet.
Der Name des Checkpoints des Sammelpunkts.
Das Ergebnis der Sammelpunkts (fehlgeschlagen oder bestanden).
Modus für die Schema-Inferenz der gesammelten Daten (Schema)¶
Dies ist der Standardmodus, der die Schema-Inferenz von Pandera nutzt, um die Metadaten und Prüfungen zu erhalten, die für den angegebenen DataFrame ausgewertet werden. In diesem Modus werden auch benutzerdefinierte Daten aus den Spalten des DataFrame auf der Grundlage des PySpark-Typs erfasst.
Die Spaltendaten und -prüfungen werden auf der Grundlage des PySpark-Typs der Spalte erfasst (siehe die Tabellen unten). Für jede Spalte, unabhängig von ihrem Typ, werden benutzerdefinierte Daten gesammelt, darunter der Name der Spalte, der Typ der Spalte, ob sie Nullwerte zulässt, die Anzahl der Zeilen, die Anzahl der ungültigen Zeilen und die Anzahl der Nullzeilen.
Spaltentyp |
Erfasste benutzerdefinierte Daten |
---|---|
Numerisch ( |
Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle (im Falle des Typs Ganzzahl ist der Wert Null). Die Standardabweichung. |
Date |
Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%d |
DayTimeIntervalType und YearMonthIntervalType |
Der Mindestwert. Der Höchstwert. |
Zeitstempel |
Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dH:%M:%S |
Zeitstempel ntz |
Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dT%H:%M:%S%z |
String |
Der Wert für die Mindestlänge. Der Wert für die maximale Länge. |
Char |
PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist char kein gültiger Typ. |
Varchar |
PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist Varchar kein gültiger Typ. |
Dezimalzahl |
Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle. |
Array |
Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiges Element. Der Anteil der Nullwerte. Die maximale Array-Größe. Die minimale Array-Größe. Die durchschnittliche Größe von Arrays. Wenn alle Arrays die gleiche Größe haben. |
Binär |
Die maximale Größe. Die Mindestgröße. Die durchschnittliche Größe. Wenn alle Elemente die gleiche Größe haben. |
Zuordnung |
Der Typ des Schlüssels. Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiger Wert. Der Anteil der Nullwerte. Die maximale der Zuordnung. Die Mindestgröße der Zuordnung. Die durchschnittliche Größe der Zuordnung. Wenn alle Zuordnungen die gleiche Größe haben. |
NULL |
NullType steht für „Keine“, da die Daten des Typs nicht ermittelt werden können. Daher ist es nicht möglich, Informationen von diesem Typ zu erhalten. |
Struktur |
Die Metadaten der Struktur – sie beziehen sich auf jedes structField: |
Sie definiert außerdem eine Reihe von vordefinierten Validierungsprüfungen für jeden Datentyp, die in der folgenden Tabelle aufgeführt sind:
Typ |
Pandera-Prüfungen |
Zusätzliche Prüfungen |
---|---|---|
Boolesch |
Jeder Wert ist „True“ oder „False“. |
Die Anzahl der True- und False-Werte. |
Numerisch ( |
Jeder Wert liegt im Bereich von Minimalwert und Maximalwert. |
Die Genauigkeit der Dezimalstelle. Der Mittelwert. Die Standardabweichung. |
Date |
N/A |
Mindest- und Höchstwerte |
Zeitstempel |
Jeder Wert liegt im Bereich von Minimalwert und Maximalwert. |
Das Format des Wertes. |
Zeitstempel ntz |
Jeder Wert liegt im Bereich von Minimalwert und Maximalwert. |
Das Format des Wertes. |
String |
Jede Wertlänge liegt im Bereich der min. und max. Länge. |
Keine |
Char |
PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist |
|
Varchar |
PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist |
|
Dezimalzahl |
N/A |
N/A |
Array |
N/A |
Keine |
Binär |
N/A |
Keine |
Zuordnung |
N/A |
Keine |
NULL |
N/A |
N/A |
Struktur |
N/A |
Keine |
Dieser Modus erlaubt es dem Benutzer, eine Stichprobe eines DataFrame zur Erfassung definieren – die ist jedoch optional. Standardmäßig erfolgt die Erfassung über den gesamten DataFrame. Der Umfang der Stichprobe muss die Population statistisch repräsentieren.
Pandera kann nur das Schema eines Pandas-DataFrame ableiten, was bedeutet, dass der PySpark DataFrame in ein Pandas DataFrame konvertiert werden muss, was sich auf die Typauflösungen der Spalten auswirken kann. Insbesondere ermittelt Pandera die folgenden PySpark-Typen als Objekttypen: string
, array
, map
, null
, struct
und binary
.
Die Ausgabe dieses Modus ist eine JSON-Datei für jeden erfassten DataFrame, wobei der Name der Datei derselbe ist wie der Checkpoint. Diese Datei enthält Informationen über das Schema und besteht aus zwei Abschnitten:
Der Abschnitt Pandera-Schema enthält die von Pandera abgeleiteten Daten wie Name, Typ (Pandas), ob die Spalte Nullwerte zulässt oder nicht, sowie weitere Informationen für jede Spalte und Prüfungen der Spalten auf der Grundlage des PySpark-Typs. Es ist ein
DataFrameSchema
-Objekt von Pandera.Der Abschnitt mit den benutzerdefinierten Daten ist ein Array mit den benutzerdefinierten Daten, die von jeder Spalte auf der Grundlage des PySpark-Typs erfasst werden.
Bemerkung
Bei der Verarbeitung großer PySpark DataFrames kann es bei der Erfassung zu Speicherproblemen kommen. Um dieses Problem zu beheben, können Sie den sample-Parameter in der Sammelfunktion auf einen Wert zwischen 0,0 und 1,0 setzen, um mit einer Teilmenge der Daten statt mit dem gesamten PySpark DataFrame zu arbeiten.
DataFrame-Modus für erfasste Daten (DataFrame)¶
Dieser Modus erfasst die Daten des PySpark DataFrame. In diesem Fall speichert der Mechanismus alle Daten des angegebenen DataFrame im Parquet-Format. Unter Verwendung der Standard-Snowflake-Verbindung des Benutzers wird versucht, die Parquet-Dateien in den temporären Stagingbereich von Snowflake hochzuladen und eine Tabelle auf der Grundlage der Informationen im Stagingbereich zu erstellen. Der Name der Datei und der Tabelle ist derselbe wie der des Checkpoints.
Die Ausgabe dieses Modus ist eine Parquet-Datei, die als Ergebnis des DataFrame gespeichert wird, und eine Tabelle mit den DataFrame-Daten in der Standard-Snowflake-Konfigurationsverbindung.
Den in Snowpark konvertierten Code validieren¶
Das Snowpark-Checkpoints-Paket bietet eine Reihe von Validierungen, die auf den Snowpark-Code angewendet werden können, um sicherzustellen, dass die Verhaltensweise dem PySpark-Code entspricht.
Vom Framework bereitgestellte Funktionen¶
check_with_spark: Ein Decorator, der alle Snowpark-DataFrame-Argumente in eine Funktion oder ein Beispiel und dann in PySpark DataFrames umwandelt. Die Prüfung führt dann eine bereitgestellte Spark-Funktion aus, die die Funktionalität der neuen Snowpark-Funktion widerspiegelt, und vergleicht die Ausgaben der beiden Implementierungen. Unter der Annahme, dass die Spark-Funktion und die Snowpark-Funktionen semantisch identisch sind, können diese Funktionen anhand realer Stichproben überprüft werden.
- Parameter:
job_context
(SnowparkJobContext): Der Jobkontext mit der Konfiguration und den Details für die Validierung.spark_function
(fn): Die entsprechende PySpark-Funktion zum Vergleich mit der Snowpark-Implementierung.checkpoint_name
(str): Ein Name für den Checkpoint. Die Standardeinstellung ist „Keine“.sample_number
(Optional[int], optional): Die Anzahl der Zeilen für die Validierung. Der Standardwert ist 100.sampling_strategy
(Optional[SamplingStrategy], optional): Die für das Sampling verwendete Strategie. Die Standardeinstellung istSamplingStrategy.RANDOM_SAMPLE
.output_path
(Optional[str], optional): Der Pfad zum Speichern der Validierungsergebnisse. Die Standardeinstellung ist „Keine“.
Nachfolgenden sehen Sie ein Beispiel:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: Diese Funktion validiert einen Snowpark-Dataframe anhand einer bestimmten Checkpoint-Schemadatei oder eines importierten Dataframes gemäß dem Argumentmodus. Sie stellt sicher, dass die für diesen DataFrame gesammelten Informationen und der DataFrame, der an die Funktion übergeben wird, übereinstimmen.
- Parameter:
df
(SnowparkDataFrame): Der zu validierende DataFrame.checkpoint_name
(str): Der Name des Checkpoints, gegen den validiert werden soll.job_context
(SnowparkJobContext, optional) (str): Der Jobkontext für die Validierung. Erforderlich für den PARQUET-Modus.mode
(CheckpointMode): Der Validierungsmodus (z. B. SCHEMA, PARQUET). Die Standardeinstellung ist SCHEMA.custom_checks
(Optional[dict[Any, Any]], optional): Benutzerdefinierte Prüfungen, die während der Validierung angewendet werden.skip_checks
(Optional[dict[Any, Any]], optional): Prüfungen, die bei der Validierung übersprungen werden sollen.sample_frac
(Optional[float], optional): Teil des DataFrame, der für die Validierung ausgewählt wird. Der Standardwert ist 0.1.sample_number
(Optional[int], optional): Anzahl der Zeilen, die für die Validierung ausgewählt werden.sampling_strategy
(Optional[SamplingStrategy], optional): Strategie, die für das Sampling verwendet werden soll.output_path
(Optional[str], optional): Der Ausgabepfad für die Validierungsergebnisse.
Nachfolgenden sehen Sie ein Beispiel:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
Je nach gewähltem Modus verwendet die Validierung entweder die gesammelte Schemadatei oder einen mit Parquet geladenen Datenframe in Snowflake, um die Äquivalenz mit der PySpark-Version zu überprüfen.
check-output_schema: Dieser Decorator validiert das Schema der Ausgabe einer Snowpark-Funktion und stellt sicher, dass die DataFrame-Ausgabe mit einem bestimmten Pandera-Schema übereinstimmt. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie umschließt die Snowpark-Funktion und führt eine Schemavalidierung des Ausgabe-DataFrame durch, bevor sie das Ergebnis zurückgibt.
Nachfolgenden sehen Sie ein Beispiel:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: Dieser Decorator prüft das Schema der Eingabeargumente einer Snowpark-Funktion. Dieser Decorator stellt sicher, dass der Eingabe-DataFrame mit einem bestimmten Pandera-Schema übereinstimmt, bevor die Funktion ausgeführt wird. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie schließt die Snowpark-Funktion ein und führt eine Schemavalidierung des Eingabe-DataFrame durch, bevor sie die Funktion ausführt.
Nachfolgenden sehen Sie ein Beispiel:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Statistische Prüfungen¶
Statistische Prüfungen werden standardmäßig auf den spezifischen Spaltentyp angewendet, wenn die Prüfung im Modus Schema
ausgeführt wird; diese Prüfungen können mit skip_checks
übersprungen werden.
Spaltentyp |
Standardprüfung |
---|---|
Numerisch: |
between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert. decimal_precision: Wenn der Wert eine Dezimalzahl ist, wird die Genauigkeit der Dezimalstellen überprüft. mean: Überprüft, ob der Mittelwert der Spalten innerhalb eines bestimmten Bereichs liegt. |
Boolesch |
isin: Überprüft, ob der Wert „True“ oder „False“ ist. True_proportion: Prüft, ob der Anteil der „True“-Werte innerhalb eines bestimmten Bereichs liegt. False_proportion: Überprüfung, ob der Anteil der „False“-Werte innerhalb eines bestimmten Bereichs liegt. |
Datum: |
between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert. |
Nullwertfähig: Alle unterstützten Typen |
Null_proportion: Validiert den Nullanteil entsprechend. |
Prüfungen überspringen¶
Es gibt eine granulare Steuerung für Prüfungen, mit der Sie die Spaltenvalidierung oder bestimmte Prüfungen für eine Spalte überspringen können. Mit dem Parameter skip_checks
können Sie die spezifische Spalte und die Art der Validierung angeben, die Sie überspringen möchten. Der Name des Prüfung, der zum Überspringen verwendet wird, ist der Name, der mit der Prüfung verbunden ist.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Individuelle Prüfungen¶
Sie können dem aus der JSON-Datei generierten Schema mit der Eigenschaft custom_checks
zusätzliche Prüfungen hinzufügen. Dadurch wird die Prüfung zum Pandera-Schema hinzugefügt:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Sampling-Strategien¶
Der Sampling-Prozess des bereitgestellten Codes wurde entwickelt, um große DataFrames effizient zu validieren, indem eine repräsentative Stichprobe der Daten genommen wird. Diese Vorgehensweis hilft bei der Schema-Validierung, ohne dass der gesamte Datensatz verarbeitet werden muss, was rechenintensiv und zeitaufwendig sein kann.
- Parameter:
sample_frac
: Dieser Parameter gibt den Anteil des DataFrame an, der entnommen werden soll. Wenn zum Beispielsample_frac
auf 0,1 eingestellt ist, werden 10 Prozent der Zeilen aus dem DataFrame als Stichprobe entnommen. Dies ist nützlich, wenn Sie eine Teilmenge der Daten validieren möchten, um Rechenressourcen zu sparen.sample_number
: Dieser Parameter gibt die genaue Anzahl der Zeilen an, die aus dem DataFrame als Stichprobe entnommen werden sollen. Wenn zum Beispielsample_number
auf 100 eingestellt ist, werden 100 Zeilen aus dem DataFrame entnommen. Dies ist nützlich, wenn Sie eine feste Anzahl von Zeilen unabhängig von der Größe des DataFrame validieren möchten.
Validierungsergebnis¶
Nachdem eine beliebige Art von Validierung durchgeführt wurde, wird das Ergebnis, unabhängig davon sie bestanden wurde oder nicht, in checkpoint_validation_results.json
gespeichert. Diese Datei wird hauptsächlich für die Funktionalitäten der VSCode-Erweiterung verwendet. Sie enthält Informationen über den Status der Überprüfung, den Zeitstempel, den Checkpoint-Namen, die Nummer der Zeile, in der die Funktion ausgeführt wird, und die Datei.
Sie protokolliert das Ergebnis auch im Standard-Snowflake Konto in einer Tabelle namens SNOWPARK_CHECKPOINTS_REPORT, die Informationen über das Validierungsergebnis enthält.
DATE
: Zeitstempel der Ausführung der Validierung.JOB
: Name der SnowparkJobContext.STATUS
: Status der Validierung.CHECKPOINT
: Name des überprüften Checkpoints.MESSAGE
: Fehlermeldung.DATA
: Daten aus der Ausführung der Validierung.EXECUTION_MODE
: Validierungsmodus ausgeführt.
Checkpoint-Umgebungsvariable¶
Das Standardverhalten des Frameworks, um die Datei checkpoints.json
zu finden, ist die Suche nach einer Umgebungsvariablen namens SNOWFLAKE_CHECKPOINT_CONTRACT_FILE_PATH_ENV_VAR
. Diese Variable enthält den relativen Pfad des checkpoint.json
. Sie wird von der VSCode-Erweiterung zugewiesen, wenn Sie den Checkpoint mit den Code Lenses im Code ausführen. Wenn die Umgebungsvariable nicht zugewiesen ist, versucht das Framework, die Datei im aktuellen Verzeichnis zu suchen.
Hypothesis Unit-Testing¶
Hypothesis ist eine leistungsstarke Testbibliothek für Python, die traditionelle Unit-Tests durch die automatische Generierung einer Vielzahl von Eingabedaten verbessert. Es verwendet eigenschaftsbasiertes Testen. Anstatt einzelne Testfälle zu spezifizieren, können Sie die erwartete Verhaltensweise Ihres Codes mit Eigenschaften oder Bedingungen beschreiben und Hypothesis erzeugt Beispiele, um diese Eigenschaften gründlich zu testen. Dieser Ansatz hilft, Grenzfälle und unerwartete Verhaltensweisen aufzudecken, und ist daher besonders effektiv bei komplexen Funktionen. Weitere Informationen finden Sie unter Hypothese.
Das snowpark-checkpoints-hypothesis
-Paket erweitert die Hypothese-Bibliothek, um synthetische Snowpark-DataFrames für Testzwecke zu erzeugen. Indem Sie die Möglichkeit von Hypothesis nutzen, vielfältige und zufällige Testdaten zu generieren, können Sie Snowpark-DataFrames mit unterschiedlichen Schemas und Werten erstellen, um reale Szenarios zu simulieren und Grenzfälle aufzudecken, um robusten Code zu gewährleisten und die Korrektheit komplexer Transformationen zu überprüfen.
Die Hypothesis-Strategie für Snowpark basiert auf Pandera zur Erzeugung synthetischer Daten. Die Funktion dataframe_strategy
verwendet das angegebene Schema, um einen Pandas-DataFrame zu generieren, der dem Schema entspricht, und konvertiert ihn dann in einen Snowpark-DataFrame.
Funktionssignatur:
def dataframe_strategy(
schema: Union[str, DataFrameSchema],
session: Session,
size: Optional[int] = None
) -> SearchStrategy[DataFrame]
Funktionsparameter:
schema
: Das Schema, das die Spalten, Datentypen und Prüfungen definiert, denen der generierte Snowpark-Datenframe entsprechen soll. Das Schema kann Folgendes sein:Ein Pfad zu einer JSON-Schemadatei, die mit der Funktion
collect_dataframe_checkpoint
des Paketssnowpark-checkpoints-collectors
erstellt wurde.Eine Instanz von pandera.api.pandas.container.DataFrameSchema.
session
: Eine Instanz von snowflake.snowpark.Session, die für die Erstellung des Snowpark-DataFrames verwendet wird.size
: Die Anzahl der zu erzeugenden Zeilen für jeden Snowpark DataFrame. Wenn dieser Parameter nicht angegeben wird, generiert die Strategie DataFrames in unterschiedlichen Größen.
Funktionsausgabe:
Gibt eine Hypothesis-SearchStrategy zurück, die Snowpark DataFrames erzeugt.
Unterstützte und nicht unterstützte Datentypen¶
Die Funktion dataframe_strategy
unterstützt die Erstellung von Snowpark-DataFrames mit verschiedenen Datentypen. Je nach dem Typ des an die Funktion übergebenen Schema-Arguments variieren die von der Strategie unterstützten Datentypen. Beachten Sie, dass die Strategie eine Ausnahme auslöst, wenn sie einen nicht unterstützten Datentyp findet.
Die folgende Tabelle zeigt die von der Funktion dataframe_strategy
unterstützten und nicht unterstützten PySpark Datentypen, wenn eine JSON-Datei als schema
-Argument übergeben wird.
Datentyp PySpark |
Unterstützt |
---|---|
Ja |
|
Ja |
|
Nein |
|
Ja |
|
Nein |
|
Nein |
|
Nein |
|
Nein |
|
Ja |
|
Ja |
|
Nein |
|
Ja |
|
Ja |
|
Nein |
|
Nein |
Die folgende Tabelle zeigt die Pandera-Datentypen, die von der Funktion dataframe_strategy
unterstützt werden, wenn ein DataFrameSchema Objekt als schema
-Argument übergeben wird, sowie die Snowpark-Datentypen, denen sie zugeordnet sind.
Pandera-Datentyp |
Snowpark-Datentyp |
---|---|
int8 |
|
int16 |
|
int32 |
|
int64 |
|
float32 |
|
float64 |
|
string |
|
bool |
|
datetime64[ns, tz] |
|
datetime64[ns] |
|
date |
Beispiele¶
Der typische Workflow für die Verwendung der Hypothesis-Bibliothek zur Erstellung eines Snowpark-DataFrames ist wie folgt:
Erstellen Sie eine Standard-Python-Testfunktion mit den verschiedenen Assertionen oder Bedingungen, die Ihr Code für alle Eingaben erfüllen sollte.
Fügen Sie den Hypothesis
@given
-Decorator zu Ihrer Testfunktion hinzu und übergeben Sie die Funktiondataframe_strategy
als Argument. Weitere Informationen über den Decorator@given
finden Sie unter hypothesis.given.Führen Sie die Testfunktion aus. Wenn der Test ausgeführt wird, stellt Hypothesis automatisch die generierten Eingaben als Argumente für den Test bereit.
Beispiel 1: Generieren von Snowpark-DataFrames aus einer JSON-Datei
Im Folgenden finden Sie ein Beispiel für die Generierung von Snowpark DataFrames aus einer JSON Schemadatei, die mit der Funktion collect_dataframe_checkpoint
des snowpark-checkpoints-collectors
-Pakets erstellt wurde.
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_json_file(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Beispiel 2: Generieren eines Snowparks-DataFrame aus einem Pandera-DataFrameSchema-Objekt
Nachfolgend finden Sie ein Beispiel für die Generierung von Snowpark-DataFrames aus einer Instanz eines Pandera-DataFrameSchema. Weitere Informationen finden Sie unter Pandera-DataFrameSchema.
import pandera as pa
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema=pa.DataFrameSchema(
{
"boolean_column": pa.Column(bool),
"integer_column": pa.Column("int64", pa.Check.in_range(0, 9)),
"float_column": pa.Column(pa.Float32, pa.Check.in_range(10.5, 20.5)),
}
),
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function_from_dataframeschema_object(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Beispiel 3: Anpassen der Hypothesis-Verhaltensweise
Sie können die Verhaltensweise Ihres Tests auch mit dem Hypothesis-Decorator @settings
anpassen. Mit diesem Decorator können Sie verschiedene Konfigurationsparameter anpassen, um das Testverhalten auf Ihre Bedürfnisse zuzuschneiden. Mit dem @settings
-Decorator können Sie Aspekte wie die maximale Anzahl von Testfällen, die Frist für jede Testausführung, die Ausführlichkeitsstufen und viele andere steuern. Weitere Informationen finden Sie unter Hypothesis-Einstellungen.
from datetime import timedelta
from hypothesis import given, settings
from snowflake.snowpark import DataFrame, Session
from snowflake.hypothesis_snowpark import dataframe_strategy
@given(
df=dataframe_strategy(
schema="path/to/file.json",
session=Session.builder.getOrCreate(),
)
)
@settings(
deadline=timedelta(milliseconds=800),
max_examples=25,
)
def test_my_function(df: DataFrame):
# Test a particular function using the generated Snowpark DataFrame
...
Einrichten einer IDE für Snowpark Checkpoints¶
Der Snowflake Extension for Visual Studio-Code bietet Unterstützung für die Snowpark Checkpoints Bibliothek, um die Nutzung des Frameworks zu verbessern. Es gibt Ihnen eine abgestufte Kontrolle über die collect
- und validate
-Anweisungen, die in Ihren Code eingefügt werden, und überprüft den Status der Verhaltensäquivalenz-Assertionen Ihres konvertierten Codes.
Snowpark Checkpoints aktivieren¶
Um Snowpark Checkpoints zu aktivieren, gehen Sie zu den Erweiterungseinstellungen von Snowflake und aktivieren Sie Snowpark Checkpoints: Enabled.

Ansicht¶
Wenn Sie die Eigenschaft Snowpark Checkpoints auf Enabled setzen, öffnet sich in der Erweiterung eine neue Registerkarte namens SNOWPARK CHECKPOINTS. Es zeigt alle Checkpoints im Arbeitsbereich an und ermöglicht mehrere Aktionen, wie z. B. das Aktivieren/Deaktivieren aller oder einzelner Checkpoints, das Löschen aller Checkpoints aus Dateien und das Navigieren zu der Datei und der Codezeile, in der der Prüfpunkt definiert ist, indem Sie auf den jeweiligen Checkpoint doppelklicken.
Alle Checkpoints umschalten¶
Diese Option befindet sich in der rechten oberen Ecke der Registerkarte Snowpark Checkpoints und schaltet die aktivierte Eigenschaft in allen Checkpoints um.

Aktivierte Checkpoints:

Die Deaktivierung eines Checkpoints führt dazu, dass er zur Laufzeit übersprungen wird.

Alle Checkpoints bereinigen¶
Befindet sich in der oberen rechten Ecke der Registerkarte Snowpark Checkpoints. Dadurch werden Checkpoints aus allen Python-Dateien, einschließlich Jupyter-Notebooks, in Ihrem Arbeitsbereich entfernt, aber nicht aus dem Vertrag und dem Panel gelöscht. Das heißt, sie können mit dem Befehl Snowflake: Restore All Checkpoints
wiederhergestellt werden.

Einfügen von Checkpoints in eine Datei¶
Wenn Sie mit der rechten Maustaste in eine Datei klicken, wird ein Kontextmenü mit der Option Snowpark Checkpoints angezeigt, mit der Sie Collection- und Validation-Checkpoints hinzufügen können.
Option Snowpark-Checkpoints im Kontextmenü:

Collector/Validator hinzugefügt:

Einen einzelnen Checkpoint ausführen¶
Sie können einen einzelnen Checkpoint ausführen, indem Sie auf die Code-Lense-Option klicken, die über jedem Checkpoint angezeigt wird. Wenn Sie es ausführen, wird eine Ausgabekonsole angezeigt, die den Fortschritt anzeigt. Sobald es beendet ist, wird die Ergebnisansicht aufgerufen. Sogar wenn der Checkpoint in der Vertragsdatei deaktiviert ist, wird er nur für seine Ausführung aktiviert.

Wenn ein Einstiegspunkt nicht in der Vertragsdatei deklariert ist, wird die Fehlermeldung: Einstiegspunkt für den Checkpoint nicht gefunden. angezeigt.

Ausführen aller aktivierten Snowpark Checkpoints in einer Datei¶
In der oberen rechten Ecke jeder Datei finden Sie die Schaltfläche Run all checkpoints from the current file.

Wenn Sie darauf klicken, erscheint ein Ausgabekanal, der den Fortschritt der Ausführung anzeigt.

Zeitleistenansicht¶
Zeigt eine Zeitleiste mit den Ergebnissen der Checkpoint-Ausführung an.

Befehle¶
Die folgenden Befehle sind für Snowpark Checkpoints verfügbar. Um sie zu verwenden, geben Sie Snowflake: [command name]
in die Befehlspalette ein.
Befehl |
Beschreibung |
---|---|
Snowflake: Checkpoint umschalten |
Schaltet die Eigenschaft „aktiviert“ für alle Checkpoints um. |
Snowflake: Snowpark Checkpoints – Projektinitialisierung |
Triggert die Projektinitialisierung und legt eine Vertragsdatei an, wenn sie nicht vorhanden ist. Falls er existiert, wird ein Popup-Fenster angezeigt, in dem Sie gefragt werden, ob Sie den Checkpoint in die Vertragsdatei laden möchten. |
Snowflake: Alle Checkpoints löschen |
Löscht alle Checkpoints aus allen Dateien im Arbeitsbereich. |
Snowflake: Alle Checkpoints wiederherstellen |
Stellen Sie Checkpoints wieder her, die zuvor aus Dateien gelöscht wurden und noch in der Vertragsdatei vorhanden sind. |
Snowflake: Validierungs-/Erfassungs-Checkpoint hinzufügen |
Fügt einen Validator oder Collector mit seinen obligatorischen Parametern an der Cursorposition ein. |
Snowflake: Fokus auf Snowpark Checkpoints-Ansicht |
Verschiebt den Fokus auf den Bereich SNOWPARK CHECKPOINTS. |
Snowflake: Zeitleiste der offenen Checkpoints |
Zeigt eine Zeitleiste der Checkpoint-Ausführungen an. |
Snowflake: Alle Checkpoints aus der aktuellen Datei ausführen |
Führt alle aktivierten Checkpoints in der aktuellen Datei aus. |
Snowflake: Alle Checkpoints im Arbeitsbereich ausführen |
Führt alle aktivierten Checkpoints aus dem Arbeitsbereich aus. |
Snowflake: Alle Snowpark Checkpoint-Ergebnisse anzeigen |
Zeigt eine Registerkarte mit den Ergebnissen aller Checkpoints an. |
Warnungen¶
Duplikat: Wenn in einem Sammlungsprojekt zwei Checkpoints mit demselben Namen zugewiesen werden, erscheint eine Warnung: „Ein weiterer Checkpoint mit identischem Namen wurde entdeckt und wird überschrieben.“ Validierungsprojekte können mehrere Checkpoints mit demselben Namen haben, es wird keine Warnung angezeigt.
Falscher Typ: Wenn Sie einen Checkpoint mit einem anderen Typ als dem Projekttyp hinzufügen, wird er mit der folgenden Fehlermeldung unterstrichen: „Stellen Sie sicher, dass Sie die richtige Snowpark-Checkpoints-Anweisung verwenden. Diese spezielle Checkpoint-Anweisung unterscheidet sich von den anderen in diesem Projekt verwendeten Anweisungen. Anweisungen, die nicht dem Projekttyp entsprechen, werden bei der Ausführung ignoriert.
Ungültiger Checkpoint-Name: Es gibt ungültige Möglichkeiten, einen Parameter für den Checkpoint-Namen hinzuzufügen. Wenn dies geschieht, wird eine Warnmeldung angezeigt: „Ungültiger Checkpoint-Name.“ Checkpoint-Namen müssen mit einem Buchstaben beginnen und dürfen nur Buchstaben, Zahlen, Bindestriche und Unterstriche enthalten.