Points de contrôle dans Databricks¶
Les points de contrôle Snowpark écrivent des fichiers sur les résultats collectés et lisent ces mêmes fichiers pour valider DataFrames. Certains de ces fichiers sont générés à l’aide de PySpark ; d’autres à l’aide de paquets Python tels que os
ou glob
. Ce type de comportement dans la gestion des fichiers peut entraîner des incohérences dans un environnement Databricks, où le système de fichiers diffère des environnements traditionnels. Par conséquent, vous devez adapter le paquet pour garantir une lecture et une écriture correctes des fichiers.
La section suivante montre comment configurer les points de contrôle Snowpark pour qu’ils fonctionnent de manière transparente dans un environnement Databricks, permettant ainsi une validation efficace de DataFrame.
Conditions préalables¶
Avant d’utiliser les points de contrôle Snowpark dans Databricks, assurez-vous que votre environnement répond aux exigences suivantes :
PySpark :
Version 3.5.0 ou supérieure.Python :
Version 3.9, 3.10 et 3.11
Voici les versions de Databricks Runtime qui répondent à ces exigences :
Databricks Runtime 14.3 LTS
Databricks Runtime 15.4 LTS
Stratégies d’entrée/de sortie (E/S)¶
Pour garantir le bon fonctionnement des points de contrôle Snowpark dans divers environnements, vous pouvez utiliser l’interface EnvStrategy
et ses classes d’implémentation pour les opérations de lecture et d’écriture de fichiers. Cela permet d’adapter et de personnaliser les opérations d’E/S.
Avec les points de contrôle Snowpark, vous pouvez implémenter vos propres méthodes d’entrée/de sortie personnalisées en créant une classe qui implémente l’interface
EnvStrategy
. Vous pouvez alors adapter les opérations à votre environnement d’exécution et à vos attentes spécifiques.En interne, le paquet utilise une classe par défaut (
IODefaultStrategy
) qui met en œuvre l’interfaceEnvStrategy
et fournit une implémentation de base des opérations d’entrée/de sortie. Vous pouvez remplacer cette stratégie par défaut par une implémentation personnalisée adaptée aux besoins spécifiques de votre environnement.
Important
Chaque paquet de points de contrôle Snowpark (snowpark-checkpoints-collectors
, snowpark-checkpoints-validators
, snowpark-checkpoints-hypothesis
) comprend sa propre copie des classes de gestion de fichiers. Par conséquent, toute modification de la configuration des fichiers doit être appliquée à chaque paquet séparément. Veillez à importer la configuration du paquet que vous utilisez.
Fonctions d’E/S¶
Ces méthodes de lecture et d’écriture des fichiers peuvent être personnalisées :
mkdir
: crée un dossier.folder_exists
: vérifie si un dossier existe.file_exists
: vérifie si un fichier existe.write
: écrit le contenu d’un fichier.read
: lit le contenu d’un fichier.read_bytes
: lit le contenu binaire d’un fichier.ls
: répertorie le contenu d’un répertoire.getcwd
: accède au répertoire de travail actuel.remove_dir
: supprime un répertoire et son contenu. Cette fonction est exclusivement utilisée dans le modulesnowpark-checkpoints-collectors
.telemetry_path_files
: accède au chemin d’accès des fichiers de télémétrie.
Stratégie Databricks¶
La stratégie Databricks est une configuration qui sait utiliser les chemins de fichiers DBFS. Elle utilise la fonction normalize_dbfs_path
pour s’assurer que tous les chemins commencent par /dbfs/
.
Comment l’utiliser¶
Pour utiliser la stratégie Databricks, vous devez la configurer explicitement dans le code. Voici comment :
Importez les classes nécessaires :
from typing import Optional, BinaryIO from pathlib import Path from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
Définissez la stratégie Databricks :
class IODatabricksStrategy(EnvStrategy): def __init__(self): self.default_strategy = IODefaultStrategy() def mkdir(self, path: str, exist_ok: bool = False) -> None: path = normalize_dbfs_path(path) self.default_strategy.mkdir(path, exist_ok=exist_ok) def folder_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.folder_exists(path) def file_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.file_exists(path) def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None: file_path = normalize_dbfs_path(file_path) self.default_strategy.write(file_path, file_content, overwrite=overwrite) def read( self, file_path: str, mode: str = "r", encoding: Optional[str] = None ) -> str: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read(file_path, mode=mode, encoding=encoding) def read_bytes(self, file_path: str) -> bytes: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read_bytes(file_path) def ls(self, path: str, recursive: bool = False) -> list[str]: file_path = normalize_dbfs_path(path) list_of_files = self.default_strategy.ls(file_path, recursive=recursive) return [content.replace("/dbfs","") for content in list_of_files] def getcwd(self) -> str: try: parent_folder = "/snowpark_checkpoints" self.mkdir(parent_folder, exist_ok=True) return parent_folder except Exception: return "" def remove_dir(self, path:str) -> None: path = normalize_dbfs_path(path) self.default_strategy.remove_dir(path) def telemetry_path_files(self, path:str) -> Path: path = normalize_dbfs_path(path) return self.default_strategy.telemetry_path_files(path) def normalize_dbfs_path(path: str) -> str: if isinstance(path, Path): path = str(path) if not path.startswith("/"): path = "/" + path if not path.startswith("/dbfs/"): path = f'/dbfs{path}' return path
Configurez la stratégie Databricks :
get_io_file_manager().set_strategy(IODatabricksStrategy())
L’exécution de ce code au début de votre script ou notebook Databricks configure les points de contrôle Snowpark pour qu’ils utilisent la stratégie d’E/S définie pour une gestion correcte des fichiers dans DBFS.
Personnalisation facultative¶
Pour des opérations d’entrée/de sortie plus spécialisées, une stratégie personnalisée peut être conçue et mise en œuvre. Cette approche offre un contrôle total et une grande flexibilité sur le comportement des entrées/sorties. Elle permet aux développeurs d’adapter précisément la stratégie à leurs exigences et contraintes spécifiques, en optimisant éventuellement les performances, l’utilisation des ressources ou d’autres facteurs pertinents.
Important
Lorsque vous utilisez des stratégies personnalisées, il vous incombe de veiller à ce que les opérations d’entrée/de sortie fonctionnent correctement.