Snowpark Checkpoints-Bibliothek: Sammler

Das Snowpark Checkpoints Python-Paket bietet eine Reihe von Funktionalitäten zur Unterstützung der Validierung von migrierten Workloads. In diesem Abschnitt finden Sie eine Übersicht über die wichtigsten Features und Funktionen des Pakets sowie eine Anleitung, wie Sie diese effektiv nutzen können.

Sammeln Sie Informationen über Ihren PySpark-Code

Das Paket snowpark-checkpoints-collectors bietet eine Funktion zum Extrahieren von Informationen aus der PySpark DataFrames. Diese Daten können wir dann mit den konvertierten Snowpark-DataFrames abgleichen, um die Gleichwertigkeit der Verhaltensweise sicherzustellen.

Verwenden Sie die folgende Funktion, um einen neuen Checkpoint-Sammelpunkt einzufügen:

Funktionssignatur:

def collect_dataframe_checkpoint(df: SparkDataFrame,
  checkpoint_name: str,
  sample: Optional[float],
  mode: Optional[CheckpointMode],
  output_path: Optional[str]) -> None:
Copy

Funktionsparameter:

  • df: Die PySpark DataFrame.

  • checkpoint_name: Der Name des Checkpoints. Beginnt mit einem Buchstaben (A-Z, a-z) oder einem Unterstrich (_) und enthält nur Buchstaben, Unterstriche und Dezimalziffern (0–9).

  • sample: (optional) Der Stichprobenumfang. Der Standardwert ist 1.0 (ganz PySpark DataFrame) in einem Bereich von 0 bis 1.0.

  • mode: (optional) Der Ausführungsmodus. Die Optionen sind SCHEMA und DATAFRAME. Der Standardwert ist SCHEMA.

  • output_path: (optional) Der Ausgabepfad zum Speichern des Checkpoints. Der Standardwert ist das aktuelle Arbeitsverzeichnis.

Der Sammlungsprozess erzeugt eine Ausgabedatei mit dem Namen checkpoint_collection_result.json, die Informationen über das Ergebnis für jeden Sammelpunkt enthält. Es ist eine JSON-Datei, die die folgenden Informationen enthält:

  • Ein Zeitstempel, wann der Sammelpunkt gestartet wurde.

  • Der relative Pfad der Datei, in der sich der Sammelpunkt befindet.

  • Die Codezeile der Datei, in der sich der Sammelpunkt befindet.

  • Der Name des Checkpoints des Sammelpunkts.

  • Das Ergebnis der Sammelpunkts (fehlgeschlagen oder bestanden).

Modus für die Schema-Inferenz der gesammelten Daten (Schema)

Dies ist der Standardmodus, der die Schema-Inferenz von Pandera nutzt, um die Metadaten und Prüfungen zu erhalten, die für den angegebenen DataFrame ausgewertet werden. In diesem Modus werden auch benutzerdefinierte Daten aus den Spalten des DataFrame auf der Grundlage des PySpark-Typs erfasst.

Die Spaltendaten und -prüfungen werden auf der Grundlage des PySpark-Typs der Spalte erfasst (siehe die Tabellen unten). Für jede Spalte, unabhängig von ihrem Typ, werden benutzerdefinierte Daten gesammelt, darunter der Name der Spalte, der Typ der Spalte, ob sie Nullwerte zulässt, die Anzahl der Zeilen, die Anzahl der ungültigen Zeilen und die Anzahl der Nullzeilen.

Benutzerdefinierte Daten werden basierend auf dem PySpark-Typ der Spalte erfasst

Spaltentyp

Erfasste benutzerdefinierte Daten

Numerisch (byte, short, integer, long, float und double)

Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle (im Falle des Typs Ganzzahl ist der Wert Null). Die Standardabweichung.

Date

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%d

DayTimeIntervalType und YearMonthIntervalType

Der Mindestwert. Der Höchstwert.

Zeitstempel

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dH:%M:%S

Zeitstempel ntz

Der Mindestwert. Der Höchstwert. Das Format des Datums: %Y-%m-%dT%H:%M:%S%z

String

Der Wert für die Mindestlänge. Der Wert für die maximale Länge.

Char

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist char kein gültiger Typ.

Varchar

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist Varchar kein gültiger Typ.

Dezimalzahl

Der Mindestwert. Der Höchstwert. Der Mittelwert. Die Genauigkeit der Dezimalstelle.

Array

Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiges Element. Der Anteil der Nullwerte. Die maximale Array-Größe. Die minimale Array-Größe. Die durchschnittliche Größe von Arrays. Wenn alle Arrays die gleiche Größe haben.

Binär

Die maximale Größe. Die Mindestgröße. Die durchschnittliche Größe. Wenn alle Elemente die gleiche Größe haben.

Zuordnung

Der Typ des Schlüssels. Der Typ des Wertes. Falls erlaubt, ist Null ein zulässiger Wert. Der Anteil der Nullwerte. Die maximale der Zuordnung. Die Mindestgröße der Zuordnung. Die durchschnittliche Größe der Zuordnung. Wenn alle Zuordnungen die gleiche Größe haben.

NULL

NullType steht für „Keine“, da die Daten des Typs nicht ermittelt werden können. Daher ist es nicht möglich, Informationen von diesem Typ zu erhalten.

Struktur

Die Metadaten der Struktur – sie beziehen sich auf jedes structField: name, type, nullable, rows count, rows not null count und rows null count. Es ist ein Array.

Sie definiert außerdem eine Reihe von vordefinierten Validierungsprüfungen für jeden Datentyp, die in der folgenden Tabelle aufgeführt sind:

Die Prüfungen werden auf der Grundlage des Typs der Spalte gesammelt

Typ

Pandera-Prüfungen

Zusätzliche Prüfungen

Boolesch

Jeder Wert ist „True“ oder „False“.

Die Anzahl der True- und False-Werte.

Numerisch (byte, short, integer, long, float und double)

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Die Genauigkeit der Dezimalstelle. Der Mittelwert. Die Standardabweichung.

Date

N/A

Mindest- und Höchstwerte

Zeitstempel

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Das Format des Wertes.

Zeitstempel ntz

Jeder Wert liegt im Bereich von Minimalwert und Maximalwert.

Das Format des Wertes.

String

Jede Wertlänge liegt im Bereich der min. und max. Länge.

Keine

Char

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist char kein gültiger Typ.

Varchar

PySpark behandelt jedes Literal als Zeichenfolgentyp, daher ist Varchar kein gültiger Typ.

Dezimalzahl

N/A

N/A

Array

N/A

Keine

Binär

N/A

Keine

Zuordnung

N/A

Keine

NULL

N/A

N/A

Struktur

N/A

Keine

Dieser Modus erlaubt es dem Benutzer, eine Stichprobe eines DataFrame zur Erfassung definieren – die ist jedoch optional. Standardmäßig erfolgt die Erfassung über den gesamten DataFrame. Der Umfang der Stichprobe muss die Population statistisch repräsentieren.

Pandera kann nur das Schema eines Pandas-DataFrame ableiten, was bedeutet, dass der PySpark DataFrame in ein Pandas DataFrame konvertiert werden muss, was sich auf die Typauflösungen der Spalten auswirken kann. Insbesondere ermittelt Pandera die folgenden PySpark-Typen als Objekttypen: string, array, map, null, struct und binary.

Die Ausgabe dieses Modus ist eine JSON-Datei für jeden erfassten DataFrame, wobei der Name der Datei derselbe ist wie der Checkpoint. Diese Datei enthält Informationen über das Schema und besteht aus zwei Abschnitten:

  1. Der Abschnitt Pandera-Schema enthält die von Pandera abgeleiteten Daten wie Name, Typ (Pandas), ob die Spalte Nullwerte zulässt oder nicht, sowie weitere Informationen für jede Spalte und überprüft, ob die Spalte auf dem Typ PySpark basiert. Es ist ein DataFrameSchema-Objekt von Pandera.

  2. Der Abschnitt mit den benutzerdefinierten Daten ist ein Array mit den benutzerdefinierten Daten, die von jeder Spalte auf der Grundlage des PySpark-Typs erfasst werden.

Bemerkung

Bei der Verarbeitung großer PySpark DataFrames kann es bei der Erfassung zu Speicherproblemen kommen. Um dieses Problem zu beheben, können Sie den sample-Parameter in der Sammelfunktion auf einen Wert zwischen 0,0 und 1,0 setzen, um mit einer Teilmenge der Daten statt mit dem gesamten PySpark DataFrame zu arbeiten.

DataFrame-Modus für erfasste Daten (DataFrame)

Dieser Modus erfasst die Daten des PySpark DataFrame. In diesem Fall speichert der Mechanismus alle Daten des angegebenen DataFrame im Parquet-Format. Unter Verwendung der Standard-Snowflake-Verbindung des Benutzers wird versucht, die Parquet-Dateien in den temporären Stagingbereich von Snowflake hochzuladen und eine Tabelle auf der Grundlage der Informationen im Stagingbereich zu erstellen. Der Name der Datei und der Tabelle ist derselbe wie der des Checkpoints.

Die Ausgabe dieses Modus ist eine Parquet-Datei, die als Ergebnis des DataFrame gespeichert wird, und eine Tabelle mit den DataFrame-Daten in der Standard-Snowflake-Konfigurationsverbindung.