Snowpark Checkpoints-Bibliothek: Prüfer¶
Das Snowpark-Checkpoints-Paket bietet eine Reihe von Validierungen, die auf den Snowpark-Code angewendet werden können, um sicherzustellen, dass die Verhaltensweise dem PySpark-Code entspricht.
Vom Framework bereitgestellte Funktionen¶
check_with_spark: Ein Decorator, der alle Snowpark-DataFrame-Argumente in eine Funktion oder ein Beispiel und dann in PySpark-DataFrames umwandelt. Die Prüfung führt dann eine bereitgestellte Spark-Funktion aus, die die Funktionalität der neuen Snowpark-Funktion widerspiegelt, und vergleicht die Ausgaben der beiden Implementierungen. Unter der Annahme, dass die Spark-Funktion und die Snowpark-Funktionen semantisch identisch sind, kann der Decorator diese Funktionen anhand realer Stichproben überprüft werden.
- Parameter:
job_context
(SnowparkJobContext): Der Jobkontext mit der Konfiguration und den Details für die Validierungspark_function
(fn): Die entsprechende PySpark-Funktion zum Vergleich mit der Snowpark-Implementierungcheckpoint_name
(str): Ein Name für den Checkpoint; ist standardmäßig „None“sample_number
(Optional[int], optional): Anzahl der Zeilen für die Validierung; der Standardwert ist 100sampling_strategy
(Optional[SamplingStrategy], optional): Die für das Sampling verwendete Strategie; der Standardwert istSamplingStrategy.RANDOM_SAMPLE
output_path
(Optional[str], optional): Der Pfad zu der Datei, in der die Validierungsergebnisse gespeichert werden; standardmäßig „None“
Beispiel:
def original_spark_code_I_dont_understand(df): from pyspark.sql.functions import col, when ret = df.withColumn( "life_stage", when(col("byte") < 4, "child") .when(col("byte").between(4, 10), "teenager") .otherwise("adult"), ) return ret @check_with_spark( job_context=job_context, spark_function=original_spark_code_I_dont_understand ) def new_snowpark_code_I_do_understand(df): from snowflake.snowpark.functions import col, lit, when ref = df.with_column( "life_stage", when(col("byte") < 4, lit("child")) .when(col("byte").between(4, 10), lit("teenager")) .otherwise(lit("adult")), ) return ref df1 = new_snowpark_code_I_do_understand(df)
validate_dataframe_checkpoint: Diese Funktion validiert einen Snowpark-Dataframe anhand einer bestimmten Checkpoint-Schemadatei oder eines importierten Dataframes gemäß dem Argumentmodus. Sie stellt sicher, dass die für diesen DataFrame gesammelten Informationen und der DataFrame, der an die Funktion übergeben wird, übereinstimmen.
- Parameter:
df
(SnowparkDataFrame): Der zu validierende DataFramecheckpoint_name
(str): Der Name des Checkpoints, anhand dessen validiert werden soll.job_context
(SnowparkJobContext, optional) (str): Der Jobkontext für die Validierung; erforderlich für PARQUET -Modusmode
(CheckpointMode): Der Modus der Validierung (z. B. SCHEMA, PARQUET). Standardwert: SCHEMAcustom_checks
(Optional[dict[Any, Any]], optional): Benutzerdefinierte Prüfungen, die während der Validierung angewendet werdenskip_checks
(Optional[dict[Any, Any]], optional): Prüfungen, die bei der Validierung übersprungen werden sollensample_frac
(Optional[float], optional): Bruchteil von DataFrame als Stichprobe für die Validierung; der Standardwert ist 0,1sample_number
(Optional[int], optional): Anzahl der Zeilen als Stichprobe für die Validierungsampling_strategy
(Optional[SamplingStrategy], optional): Strategie für das Samplingoutput_path
(Optional[str], optional): Der Pfad zu der Datei, in der die Validierungsergebnisse gespeichert werden
Beispiel:
# Check a schema/stats here! validate_dataframe_checkpoint( df1, "demo_add_a_column_dataframe", job_context=job_context, mode=CheckpointMode.DATAFRAME, # CheckpointMode.Schema) )
Je nach gewähltem Modus verwendet die Validierung entweder die gesammelte Schemadatei oder einen mit Parquet geladenen Datenframe in Snowflake, um die Äquivalenz mit der PySpark-Version zu überprüfen.
check-output_schema: Dieser Decorator validiert das Schema der Ausgabe einer Snowpark-Funktion und stellt sicher, dass die DataFrame-Ausgabe mit einem bestimmten Pandera-Schema übereinstimmt. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie umschließt die Snowpark-Funktion und führt eine Schemavalidierung des Ausgabe-DataFrame durch, bevor sie das Ergebnis zurückgibt.
Beispiel:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_output_schema from numpy import int8 # Define the Pandera schema out_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), "COLUMN3": Column(float, Check.less_than(10)), } ) # Define the Snowpark function and apply the decorator @check_output_schema(out_schema, "output_schema_checkpoint") def preprocessor(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the preprocessor function preprocessed_dataframe = preprocessor(sp_dataframe)
check_input_schema: Dieser Decorator prüft das Schema der Eingabeargumente einer Snowpark-Funktion. Dieser Decorator stellt sicher, dass der Eingabe-DataFrame mit einem bestimmten Pandera-Schema übereinstimmt, bevor die Funktion ausgeführt wird. Es ist besonders nützlich, um die Datenintegrität und -konsistenz in Snowpark-Pipelines durchzusetzen. Dieser Decorator nimmt mehrere Parameter entgegen, darunter das Pandera-Schema, gegen das validiert werden soll, den Checkpoint-Namen, den Sampling-Parameter und einen optionalen Jobkontext. Sie schließt die Snowpark-Funktion ein und führt eine Schemavalidierung des Eingabe-DataFrame durch, bevor sie die Funktion ausführt.
Beispiel:
from pandas import DataFrame as PandasDataFrame from pandera import DataFrameSchema, Column, Check from snowflake.snowpark import Session from snowflake.snowpark import DataFrame as SnowparkDataFrame from snowflake.snowpark_checkpoints.checkpoint import check_input_schema from numpy import int8 # Define the Pandera schema input_schema = DataFrameSchema( { "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)), "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)), } ) # Define the Snowpark function and apply the decorator @check_input_schema(input_schema, "input_schema_checkpoint") def process_dataframe(dataframe: SnowparkDataFrame): return dataframe.with_column( "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"] ) # Create a Snowpark session and DataFrame session = Session.builder.getOrCreate() df = PandasDataFrame( { "COLUMN1": [1, 4, 0, 10, 9], "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4], } ) sp_dataframe = session.create_dataframe(df) # Apply the process_dataframe function processed_dataframe = process_dataframe(sp_dataframe)
Statistische Prüfungen¶
Statistische Prüfungen werden standardmäßig auf den spezifischen Spaltentyp angewendet, wenn die Prüfung im Modus Schema
ausgeführt wird; diese Prüfungen können mit skip_checks
übersprungen werden.
Spaltentyp |
Standardprüfung |
---|---|
Numerisch: |
between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert. decimal_precision: Wenn der Wert eine Dezimalzahl ist, wird die Genauigkeit der Dezimalstellen überprüft. mean: Überprüft, ob der Mittelwert der Spalten innerhalb eines bestimmten Bereichs liegt. |
Boolesch |
isin: Überprüfen Sie, ob der Wert „True“ oder „False“ ist. True_proportion: Überprüfen Sie, ob der Anteil der „True“-Werte innerhalb eines bestimmten Bereichs liegt. False_proportion: Überprüfen Sie, ob der Anteil der „False“-Werte innerhalb eines bestimmten Bereichs liegt. |
Datum: |
between: Wenn der Wert zwischen dem Mindestwert oder dem Höchstwert liegt, einschließlich Mindestwert und Höchstwert. |
Nullwertfähig: Alle unterstützten Typen |
Null_proportion: Validieren Sie den Nullanteil entsprechend. |
Prüfungen überspringen¶
Mit dieser detaillierten Steuerung von Prüfungen können Sie die Spaltenvalidierung oder bestimmte Prüfungen für eine Spalte überspringen. Mit dem Parameter skip_checks
können Sie die spezifische Spalte und die Art der Validierung angeben, die Sie überspringen möchten. Der Name des Prüfung, der zum Überspringen verwendet wird, ist der Name, der mit der Prüfung verbunden ist.
str_contains
str_endswith
str_length
str_matches
str_startswith
in_range
equal_to
greater_than_or_equal_to
greater_than
less_than_or_equal_to
less_than
not_equal_to
notin
isin
Beispiel:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, element_wise=True)),
"COLUMN2": Column(
float,
[
Check.greater_than(-20.5),
Check.less_than(-1.0),
Check(lambda x: x < -1.2),
],
),
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
check_dataframe_schema(
sp_df,
schema,
skip_checks={"COLUMN1": [SKIP_ALL], "COLUMN2": ["greater_than", "less_than"]},
)
Individuelle Prüfungen¶
Sie können dem aus der JSON-Datei generierten Schema mit der Eigenschaft custom_checks
zusätzliche Prüfungen hinzufügen. Damit wird die Prüfung zum Pandera-Schema hinzugefügt.
Beispiel:
df = pd.DataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
session = Session.builder.getOrCreate()
sp_df = session.create_dataframe(df)
# Those check will be added to the schema generate from the JSON file
result = validate_dataframe_checkpoint(
sp_df,
"checkpoint-name",
custom_checks={
"COLUMN1": [
Check(lambda x: x.shape[0] == 5),
Check(lambda x: x.shape[1] == 2),
],
"COLUMN2": [Check(lambda x: x.shape[0] == 5)],
},
)
Sampling-Strategien¶
Der Sampling-Prozess des bereitgestellten Codes wurde entwickelt, um große DataFrames effizient zu validieren, indem eine repräsentative Stichprobe der Daten genommen wird. Diese Vorgehensweis hilft bei der Schema-Validierung, ohne dass der gesamte Datensatz verarbeitet werden muss, was rechenintensiv und zeitaufwendig sein kann.
- Parameter:
sample_frac
: Dieser Parameter gibt den Anteil des DataFrame an, der entnommen werden soll. Wenn zum Beispielsample_frac
auf 0,1 eingestellt ist, werden 10 Prozent der Zeilen aus dem DataFrame als Stichprobe entnommen. Dies ist nützlich, wenn Sie eine Teilmenge der Daten validieren möchten, um Rechenressourcen zu sparen.sample_number
: Dieser Parameter gibt die genaue Anzahl der Zeilen an, die aus dem DataFrame als Stichprobe entnommen werden sollen. Wenn zum Beispielsample_number
auf 100 eingestellt ist, werden 100 Zeilen aus dem DataFrame entnommen. Dies ist nützlich, wenn Sie eine feste Anzahl von Zeilen unabhängig von der Größe des DataFrame validieren möchten.
Validierungsergebnis¶
Nachdem eine beliebige Art von Validierung durchgeführt wurde, wird das Ergebnis, unabhängig davon sie bestanden wurde oder nicht, in checkpoint_validation_results.json
gespeichert. Diese Datei wird hauptsächlich für die Funktionalitäten der VSCode-Erweiterung verwendet. Sie enthält Informationen über den Status der Überprüfung, den Zeitstempel, den Checkpoint-Namen, die Nummer der Zeile, in der die Funktion ausgeführt wird, und die Datei.
Sie protokolliert das Ergebnis auch im Standard-Snowflake Konto in der Tabelle SNOWPARK_CHECKPOINTS_REPORT, die Informationen über das Validierungsergebnis enthält.
DATE
: Zeitstempel der Ausführung der ValidierungJOB
: Name der SnowparkJobContextSTATUS
: Status der ValidierungCHECKPOINT
: Name des überprüften CheckpointsMESSAGE
: FehlermeldungDATA
: Daten aus der Ausführung der ValidierungEXECUTION_MODE
: Validierungsmodus ausgeführt