Checkpoints in Databricks

Snowpark Checkpoints schreibt Dateien über gesammelte Ergebnisse und liest dieselben Dateien, um DataFrames zu validieren. Einige dieser Dateien werden mit PySpark erstellt, andere mit Python-Paketen wie os oder glob. Diese Art der Dateibehandlung kann in einer Databricks-Umgebung, in der sich das Dateisystem von herkömmlichen Umgebungen unterscheidet, zu Inkonsistenzen führen. Daher müssen Sie das Paket anpassen, um das korrekte Lesen und Schreiben von Dateien sicherzustellen.

Der folgende Abschnitt zeigt, wie Sie Snowpark Checkpoints so konfigurieren, dass sie nahtlos in einer Databricks-Umgebung funktionieren und so eine effiziente DataFrame-Validierung ermöglichen.

Voraussetzungen

Bevor Sie Snowpark Checkpoints in Databricks verwenden, stellen Sie sicher, dass Ihre Umgebung die folgenden Anforderungen erfüllt:

  • PySpark: Version 3.5.0 oder höher.

  • Python: Version 3.9, 3.10 und 3.11

Die Databricks Runtime-Versionen, die diese Anforderungen erfüllen, sind:

  • Databricks Runtime 14.3 LTS

  • Databricks Runtime 15.4 LTS

Eingabe/Ausgabe (E/A)-Strategien

Um sicherzustellen, dass Snowpark Checkpoints in verschiedenen Umgebungen korrekt funktioniert, können Sie die Weboberfläche EnvStrategy und ihre Implementierungsklassen für Lese- und Schreibvorgänge von Dateien verwenden. Dadurch können E/A-Operationen angepasst und individuell gestaltet werden.

  • Mit Snowpark Checkpoints können Sie Ihre eigenen benutzerdefinierten Eingabe/Ausgabemethoden implementieren, indem Sie eine Klasse erstellen, die die Weboberfläche EnvStrategy implementiert. So können Sie die Operationen an Ihre spezielle Ausführungsumgebung und Ihre Erwartungen anpassen.

  • Intern verwendet das Paket eine Standardklasse (IODefaultStrategy), die die Weboberfläche EnvStrategy implementiert und eine grundlegende Implementierung von E/A-Operationen bietet. Sie können diese Standardstrategie durch eine benutzerdefinierte Implementierung ersetzen, die den spezifischen Anforderungen Ihrer Umgebung entspricht.

Wichtig

Jedes Snowpark Checkpoints-Paket (snowpark-checkpoints-collectors, snowpark-checkpoints-validators, snowpark-checkpoints-hypothesis) enthält seine eigene Kopie der Klassen für die Dateiverarbeitung. Daher müssen alle Änderungen an den Dateikonfigurationen für jedes Paket einzeln vorgenommen werden. Stellen Sie sicher, dass Sie die Konfiguration aus dem von Ihnen verwendeten Paket importieren.

E/A-Funktionen

Diese Methoden zum Lesen und Schreiben von Dateien können angepasst werden:

  • mkdir: Erstellt einen Ordner.

  • folder_exists: Prüft, ob ein Ordner existiert.

  • file_exists: Prüft, ob eine Datei existiert.

  • write: Schreibt den Inhalt in eine Datei.

  • read: Liest den Inhalt aus einer Datei.

  • read_bytes: Liest binäre Inhalte aus einer Datei.

  • ls: Listet den Inhalt eines Verzeichnisses auf.

  • getcwd: Ruft das aktuelle Arbeitsverzeichnis ab.

  • remove_dir: Entfernt ein Verzeichnis und dessen Inhalt. Diese Funktion wird ausschließlich im Modul snowpark-checkpoints-collectors verwendet.

  • telemetry_path_files: Ruft den Pfad zu den Telemetriedateien ab.

Databricks-Strategie

Die Databricks-Strategie ist eine Konfiguration, die mit DBFS-Dateipfaden arbeiten kann. Sie verwendet die Funktion normalize_dbfs_path, um sicherzustellen, dass alle Pfade mit /dbfs/ beginnen.

So wird sie benutzt

Um die Databricks-Strategie zu verwenden, müssen Sie sie explizit in Ihrem Code konfigurieren. Das geht so:

  1. Importieren Sie die erforderlichen Klassen:

    from typing import Optional, BinaryIO
    from pathlib import Path
    from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy
    from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
    
    Copy
  2. Definieren Sie die Databricks-Strategie:

    class IODatabricksStrategy(EnvStrategy):
    
      def __init__(self):
          self.default_strategy = IODefaultStrategy()
    
      def mkdir(self, path: str, exist_ok: bool = False) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.mkdir(path, exist_ok=exist_ok)
    
      def folder_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.folder_exists(path)
    
      def file_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.file_exists(path)
    
      def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None:
          file_path = normalize_dbfs_path(file_path)
          self.default_strategy.write(file_path, file_content, overwrite=overwrite)
    
      def read(
          self, file_path: str, mode: str = "r", encoding: Optional[str] = None
      ) -> str:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read(file_path, mode=mode, encoding=encoding)
    
      def read_bytes(self, file_path: str) -> bytes:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read_bytes(file_path)
    
      def ls(self, path: str, recursive: bool = False) -> list[str]:
          file_path = normalize_dbfs_path(path)
          list_of_files = self.default_strategy.ls(file_path, recursive=recursive)
          return [content.replace("/dbfs","") for content in list_of_files]
    
      def getcwd(self) -> str:
          try:
              parent_folder = "/snowpark_checkpoints"
              self.mkdir(parent_folder, exist_ok=True)
              return parent_folder
          except Exception:
              return ""
    
      def remove_dir(self, path:str) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.remove_dir(path)
    
      def telemetry_path_files(self, path:str) -> Path:
          path = normalize_dbfs_path(path)
          return self.default_strategy.telemetry_path_files(path)
    
    def normalize_dbfs_path(path: str) -> str:
        if isinstance(path, Path):
            path = str(path)
        if not path.startswith("/"):
            path = "/" + path
        if not path.startswith("/dbfs/"):
            path = f'/dbfs{path}'
        return path
    
    Copy
  3. Konfigurieren Sie die Databricks-Strategie:

    get_io_file_manager().set_strategy(IODatabricksStrategy())
    
    Copy

Wenn Sie diesen Code zu Beginn Ihres Databricks-Skripts oder -Notebooks ausführen, wird Snowpark Checkpoints so konfiguriert, dass es die definierte E/A-Strategie für die korrekte Dateibehandlung in DBFS verwendet.

Optionale Anpassung

Für speziellere Eingabe/Ausgabe-Operationen kann eine eigene Strategie entworfen und implementiert werden. Dieser Ansatz bietet vollständige Kontrolle und Flexibilität über die E/A-Verhaltensweise. Sie ermöglicht es Entwicklern, die Strategie genau auf ihre spezifischen Anforderungen und Einschränkungen zuzuschneiden und möglicherweise die Leistung, die Ressourcennutzung oder andere relevante Faktoren zu optimieren.

Wichtig

Wenn Sie benutzerdefinierte Strategien verwenden, sind Sie dafür verantwortlich, dass die E/A-Operationen korrekt funktionieren.