Points de contrôle dans Databricks

Les points de contrôle Snowpark écrivent des fichiers sur les résultats collectés et lisent ces mêmes fichiers pour valider DataFrames. Certains de ces fichiers sont générés à l’aide de PySpark ; d’autres à l’aide de paquets Python tels que os ou glob. Ce type de comportement dans la gestion des fichiers peut entraîner des incohérences dans un environnement Databricks, où le système de fichiers diffère des environnements traditionnels. Par conséquent, vous devez adapter le paquet pour garantir une lecture et une écriture correctes des fichiers.

La section suivante montre comment configurer les points de contrôle Snowpark pour qu’ils fonctionnent de manière transparente dans un environnement Databricks, permettant ainsi une validation efficace de DataFrame.

Conditions préalables

Avant d’utiliser les points de contrôle Snowpark dans Databricks, assurez-vous que votre environnement répond aux exigences suivantes :

  • PySpark : Version 3.5.0 ou supérieure.

  • Python : Version 3.9, 3.10 et 3.11

Voici les versions de Databricks Runtime qui répondent à ces exigences :

  • Databricks Runtime 14.3 LTS

  • Databricks Runtime 15.4 LTS

Stratégies d’entrée/de sortie (E/S)

Pour garantir le bon fonctionnement des points de contrôle Snowpark dans divers environnements, vous pouvez utiliser l’interface EnvStrategy et ses classes d’implémentation pour les opérations de lecture et d’écriture de fichiers. Cela permet d’adapter et de personnaliser les opérations d’E/S.

  • Avec les points de contrôle Snowpark, vous pouvez implémenter vos propres méthodes d’entrée/de sortie personnalisées en créant une classe qui implémente l’interface EnvStrategy. Vous pouvez alors adapter les opérations à votre environnement d’exécution et à vos attentes spécifiques.

  • En interne, le paquet utilise une classe par défaut (IODefaultStrategy) qui met en œuvre l’interface EnvStrategy et fournit une implémentation de base des opérations d’entrée/de sortie. Vous pouvez remplacer cette stratégie par défaut par une implémentation personnalisée adaptée aux besoins spécifiques de votre environnement.

Important

Chaque paquet de points de contrôle Snowpark (snowpark-checkpoints-collectors, snowpark-checkpoints-validators, snowpark-checkpoints-hypothesis) comprend sa propre copie des classes de gestion de fichiers. Par conséquent, toute modification de la configuration des fichiers doit être appliquée à chaque paquet séparément. Veillez à importer la configuration du paquet que vous utilisez.

Fonctions d’E/S

Ces méthodes de lecture et d’écriture des fichiers peuvent être personnalisées :

  • mkdir : crée un dossier.

  • folder_exists : vérifie si un dossier existe.

  • file_exists : vérifie si un fichier existe.

  • write : écrit le contenu d’un fichier.

  • read : lit le contenu d’un fichier.

  • read_bytes : lit le contenu binaire d’un fichier.

  • ls : répertorie le contenu d’un répertoire.

  • getcwd : accède au répertoire de travail actuel.

  • remove_dir : supprime un répertoire et son contenu. Cette fonction est exclusivement utilisée dans le module snowpark-checkpoints-collectors.

  • telemetry_path_files : accède au chemin d’accès des fichiers de télémétrie.

Stratégie Databricks

La stratégie Databricks est une configuration qui sait utiliser les chemins de fichiers DBFS. Elle utilise la fonction normalize_dbfs_path pour s’assurer que tous les chemins commencent par /dbfs/.

Comment l’utiliser

Pour utiliser la stratégie Databricks, vous devez la configurer explicitement dans le code. Voici comment :

  1. Importez les classes nécessaires :

    from typing import Optional, BinaryIO
    from pathlib import Path
    from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy
    from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
    
    Copy
  2. Définissez la stratégie Databricks :

    class IODatabricksStrategy(EnvStrategy):
    
      def __init__(self):
          self.default_strategy = IODefaultStrategy()
    
      def mkdir(self, path: str, exist_ok: bool = False) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.mkdir(path, exist_ok=exist_ok)
    
      def folder_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.folder_exists(path)
    
      def file_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.file_exists(path)
    
      def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None:
          file_path = normalize_dbfs_path(file_path)
          self.default_strategy.write(file_path, file_content, overwrite=overwrite)
    
      def read(
          self, file_path: str, mode: str = "r", encoding: Optional[str] = None
      ) -> str:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read(file_path, mode=mode, encoding=encoding)
    
      def read_bytes(self, file_path: str) -> bytes:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read_bytes(file_path)
    
      def ls(self, path: str, recursive: bool = False) -> list[str]:
          file_path = normalize_dbfs_path(path)
          list_of_files = self.default_strategy.ls(file_path, recursive=recursive)
          return [content.replace("/dbfs","") for content in list_of_files]
    
      def getcwd(self) -> str:
          try:
              parent_folder = "/snowpark_checkpoints"
              self.mkdir(parent_folder, exist_ok=True)
              return parent_folder
          except Exception:
              return ""
    
      def remove_dir(self, path:str) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.remove_dir(path)
    
      def telemetry_path_files(self, path:str) -> Path:
          path = normalize_dbfs_path(path)
          return self.default_strategy.telemetry_path_files(path)
    
    def normalize_dbfs_path(path: str) -> str:
        if isinstance(path, Path):
            path = str(path)
        if not path.startswith("/"):
            path = "/" + path
        if not path.startswith("/dbfs/"):
            path = f'/dbfs{path}'
        return path
    
    Copy
  3. Configurez la stratégie Databricks :

    get_io_file_manager().set_strategy(IODatabricksStrategy())
    
    Copy

L’exécution de ce code au début de votre script ou notebook Databricks configure les points de contrôle Snowpark pour qu’ils utilisent la stratégie d’E/S définie pour une gestion correcte des fichiers dans DBFS.

Personnalisation facultative

Pour des opérations d’entrée/de sortie plus spécialisées, une stratégie personnalisée peut être conçue et mise en œuvre. Cette approche offre un contrôle total et une grande flexibilité sur le comportement des entrées/sorties. Elle permet aux développeurs d’adapter précisément la stratégie à leurs exigences et contraintes spécifiques, en optimisant éventuellement les performances, l’utilisation des ressources ou d’autres facteurs pertinents.

Important

Lorsque vous utilisez des stratégies personnalisées, il vous incombe de veiller à ce que les opérations d’entrée/de sortie fonctionnent correctement.