Checkpoints in Databricks¶
Snowpark Checkpoints schreibt Dateien über gesammelte Ergebnisse und liest dieselben Dateien, um DataFrames zu validieren. Einige dieser Dateien werden mit PySpark erstellt, andere mit Python-Paketen wie os
oder glob
. Diese Art der Dateibehandlung kann in einer Databricks-Umgebung, in der sich das Dateisystem von herkömmlichen Umgebungen unterscheidet, zu Inkonsistenzen führen. Daher müssen Sie das Paket anpassen, um das korrekte Lesen und Schreiben von Dateien sicherzustellen.
Der folgende Abschnitt zeigt, wie Sie Snowpark Checkpoints so konfigurieren, dass sie nahtlos in einer Databricks-Umgebung funktionieren und so eine effiziente DataFrame-Validierung ermöglichen.
Voraussetzungen¶
Bevor Sie Snowpark Checkpoints in Databricks verwenden, stellen Sie sicher, dass Ihre Umgebung die folgenden Anforderungen erfüllt:
PySpark:
Version 3.5.0 oder höher.Python:
Version 3.9, 3.10 und 3.11
Die Databricks Runtime-Versionen, die diese Anforderungen erfüllen, sind:
Databricks Runtime 14.3 LTS
Databricks Runtime 15.4 LTS
Eingabe/Ausgabe (E/A)-Strategien¶
Um sicherzustellen, dass Snowpark Checkpoints in verschiedenen Umgebungen korrekt funktioniert, können Sie die Weboberfläche EnvStrategy
und ihre Implementierungsklassen für Lese- und Schreibvorgänge von Dateien verwenden. Dadurch können E/A-Operationen angepasst und individuell gestaltet werden.
Mit Snowpark Checkpoints können Sie Ihre eigenen benutzerdefinierten Eingabe/Ausgabemethoden implementieren, indem Sie eine Klasse erstellen, die die Weboberfläche
EnvStrategy
implementiert. So können Sie die Operationen an Ihre spezielle Ausführungsumgebung und Ihre Erwartungen anpassen.Intern verwendet das Paket eine Standardklasse (
IODefaultStrategy
), die die WeboberflächeEnvStrategy
implementiert und eine grundlegende Implementierung von E/A-Operationen bietet. Sie können diese Standardstrategie durch eine benutzerdefinierte Implementierung ersetzen, die den spezifischen Anforderungen Ihrer Umgebung entspricht.
Wichtig
Jedes Snowpark Checkpoints-Paket (snowpark-checkpoints-collectors
, snowpark-checkpoints-validators
, snowpark-checkpoints-hypothesis
) enthält seine eigene Kopie der Klassen für die Dateiverarbeitung. Daher müssen alle Änderungen an den Dateikonfigurationen für jedes Paket einzeln vorgenommen werden. Stellen Sie sicher, dass Sie die Konfiguration aus dem von Ihnen verwendeten Paket importieren.
E/A-Funktionen¶
Diese Methoden zum Lesen und Schreiben von Dateien können angepasst werden:
mkdir
: Erstellt einen Ordner.folder_exists
: Prüft, ob ein Ordner existiert.file_exists
: Prüft, ob eine Datei existiert.write
: Schreibt den Inhalt in eine Datei.read
: Liest den Inhalt aus einer Datei.read_bytes
: Liest binäre Inhalte aus einer Datei.ls
: Listet den Inhalt eines Verzeichnisses auf.getcwd
: Ruft das aktuelle Arbeitsverzeichnis ab.remove_dir
: Entfernt ein Verzeichnis und dessen Inhalt. Diese Funktion wird ausschließlich im Modulsnowpark-checkpoints-collectors
verwendet.telemetry_path_files
: Ruft den Pfad zu den Telemetriedateien ab.
Databricks-Strategie¶
Die Databricks-Strategie ist eine Konfiguration, die mit DBFS-Dateipfaden arbeiten kann. Sie verwendet die Funktion normalize_dbfs_path
, um sicherzustellen, dass alle Pfade mit /dbfs/
beginnen.
So wird sie benutzt¶
Um die Databricks-Strategie zu verwenden, müssen Sie sie explizit in Ihrem Code konfigurieren. Das geht so:
Importieren Sie die erforderlichen Klassen:
from typing import Optional, BinaryIO from pathlib import Path from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
Definieren Sie die Databricks-Strategie:
class IODatabricksStrategy(EnvStrategy): def __init__(self): self.default_strategy = IODefaultStrategy() def mkdir(self, path: str, exist_ok: bool = False) -> None: path = normalize_dbfs_path(path) self.default_strategy.mkdir(path, exist_ok=exist_ok) def folder_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.folder_exists(path) def file_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.file_exists(path) def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None: file_path = normalize_dbfs_path(file_path) self.default_strategy.write(file_path, file_content, overwrite=overwrite) def read( self, file_path: str, mode: str = "r", encoding: Optional[str] = None ) -> str: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read(file_path, mode=mode, encoding=encoding) def read_bytes(self, file_path: str) -> bytes: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read_bytes(file_path) def ls(self, path: str, recursive: bool = False) -> list[str]: file_path = normalize_dbfs_path(path) list_of_files = self.default_strategy.ls(file_path, recursive=recursive) return [content.replace("/dbfs","") for content in list_of_files] def getcwd(self) -> str: try: parent_folder = "/snowpark_checkpoints" self.mkdir(parent_folder, exist_ok=True) return parent_folder except Exception: return "" def remove_dir(self, path:str) -> None: path = normalize_dbfs_path(path) self.default_strategy.remove_dir(path) def telemetry_path_files(self, path:str) -> Path: path = normalize_dbfs_path(path) return self.default_strategy.telemetry_path_files(path) def normalize_dbfs_path(path: str) -> str: if isinstance(path, Path): path = str(path) if not path.startswith("/"): path = "/" + path if not path.startswith("/dbfs/"): path = f'/dbfs{path}' return path
Konfigurieren Sie die Databricks-Strategie:
get_io_file_manager().set_strategy(IODatabricksStrategy())
Wenn Sie diesen Code zu Beginn Ihres Databricks-Skripts oder -Notebooks ausführen, wird Snowpark Checkpoints so konfiguriert, dass es die definierte E/A-Strategie für die korrekte Dateibehandlung in DBFS verwendet.
Optionale Anpassung¶
Für speziellere Eingabe/Ausgabe-Operationen kann eine eigene Strategie entworfen und implementiert werden. Dieser Ansatz bietet vollständige Kontrolle und Flexibilität über die E/A-Verhaltensweise. Sie ermöglicht es Entwicklern, die Strategie genau auf ihre spezifischen Anforderungen und Einschränkungen zuzuschneiden und möglicherweise die Leistung, die Ressourcennutzung oder andere relevante Faktoren zu optimieren.
Wichtig
Wenn Sie benutzerdefinierte Strategien verwenden, sind Sie dafür verantwortlich, dass die E/A-Operationen korrekt funktionieren.