Checkpoints no Databricks¶
O Snowpark Checkpoints grava arquivos sobre os resultados coletados e lê esses mesmos arquivos para validar DataFrames. Alguns desses arquivos são gerados usando PySpark; outros usam pacotes Python, como os
ou glob
. Esse tipo de comportamento de tratamento de arquivos pode levar a inconsistências em um ambiente Databricks, onde o sistema de arquivos é diferente dos ambientes tradicionais. Portanto, você deve adaptar o pacote para garantir a leitura e a gravação corretas dos arquivos.
A seção a seguir demonstra como configurar o Snowpark Checkpoints para que funcione perfeitamente em um ambiente Databricks, permitindo assim uma validação eficiente do DataFrame.
Pré-requisitos¶
Antes de usar o Snowpark Checkpoints no Databricks, certifique-se de que seu ambiente atenda aos seguintes requisitos:
PySpark:
versão 3.5.0 ou superior.Python:
versão 3.9, 3.10 e 3.11
As versões do Databricks Runtime que atendem a esses requisitos são as seguintes:
Databricks Runtime 14.3 LTS
Databricks Runtime 15.4 LTS
Estratégias de entrada/saída (I/O)¶
Para garantir que o Snowpark Checkpoints funcione corretamente em vários ambientes, você pode usar a interface EnvStrategy
e suas classes de implementação para operações de leitura e gravação de arquivos. Isso permite que as operações de I/O sejam adaptáveis e personalizáveis.
Com o Snowpark Checkpoints, você pode implementar seus próprios métodos personalizados de entrada/saída criando uma classe que implemente a interface
EnvStrategy
. Assim, você pode adaptar as operações ao seu ambiente de execução e expectativas específicas.Internamente, o pacote usa uma classe padrão (
IODefaultStrategy
) que implementa a interfaceEnvStrategy
e fornece uma implementação básica das operações de I/O. Você pode substituir essa estratégia padrão por uma implementação personalizada, adequada às necessidades específicas do seu ambiente.
Importante
Cada pacote do Snowpark Checkpoints (snowpark-checkpoints-collectors
, snowpark-checkpoints-validators
, snowpark-checkpoints-hypothesis
) inclui sua própria cópia das classes de tratamento de arquivos. Portanto, qualquer alteração nas configurações de arquivo deve ser aplicada a cada pacote separadamente. Certifique-se de importar a configuração do pacote que você está usando.
Funções de I/O¶
Esses métodos de leitura e gravação de arquivos podem ser personalizados:
mkdir
: cria uma pasta.folder_exists
: verifica se uma pasta existe.file_exists
: verifica se um arquivo existe.write
: grava o conteúdo em um arquivo.read
: lê o conteúdo de um arquivo.read_bytes
: lê o conteúdo binário de um arquivo.ls
: lista o conteúdo de um diretório.getcwd
: obtém o diretório de trabalho atual.remove_dir
: remove um diretório e seu conteúdo. Essa função é usada exclusivamente no módulosnowpark-checkpoints-collectors
.telemetry_path_files
: obtém o caminho para os arquivos de telemetria.
Estratégia do Databricks¶
A estratégia do Databricks é uma configuração que sabe como trabalhar com os caminhos de arquivo DBFS. Ela usa a função normalize_dbfs_path
para garantir que todos os caminhos comecem com /dbfs/
.
Como usar¶
Para usar a estratégia do Databricks, você deve configurá-la explicitamente no código. Veja como:
Importar as classes necessárias:
from typing import Optional, BinaryIO from pathlib import Path from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
Definir a estratégia do Databricks:
class IODatabricksStrategy(EnvStrategy): def __init__(self): self.default_strategy = IODefaultStrategy() def mkdir(self, path: str, exist_ok: bool = False) -> None: path = normalize_dbfs_path(path) self.default_strategy.mkdir(path, exist_ok=exist_ok) def folder_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.folder_exists(path) def file_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.file_exists(path) def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None: file_path = normalize_dbfs_path(file_path) self.default_strategy.write(file_path, file_content, overwrite=overwrite) def read( self, file_path: str, mode: str = "r", encoding: Optional[str] = None ) -> str: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read(file_path, mode=mode, encoding=encoding) def read_bytes(self, file_path: str) -> bytes: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read_bytes(file_path) def ls(self, path: str, recursive: bool = False) -> list[str]: file_path = normalize_dbfs_path(path) list_of_files = self.default_strategy.ls(file_path, recursive=recursive) return [content.replace("/dbfs","") for content in list_of_files] def getcwd(self) -> str: try: parent_folder = "/snowpark_checkpoints" self.mkdir(parent_folder, exist_ok=True) return parent_folder except Exception: return "" def remove_dir(self, path:str) -> None: path = normalize_dbfs_path(path) self.default_strategy.remove_dir(path) def telemetry_path_files(self, path:str) -> Path: path = normalize_dbfs_path(path) return self.default_strategy.telemetry_path_files(path) def normalize_dbfs_path(path: str) -> str: if isinstance(path, Path): path = str(path) if not path.startswith("/"): path = "/" + path if not path.startswith("/dbfs/"): path = f'/dbfs{path}' return path
Configurar a estratégia do Databricks:
get_io_file_manager().set_strategy(IODatabricksStrategy())
A execução desse código no início do script Databricks ou notebook configura o Snowpark Checkpoints para usar a estratégia de I/O definida para o tratamento correto dos arquivos em DBFS.
Personalização opcional¶
Para operações de entrada/saída mais especializadas, uma estratégia personalizada pode ser desenvolvida e implementada. Essa abordagem oferece total controle e flexibilidade sobre o comportamento de I/O. Ela permite que os desenvolvedores adaptem a estratégia precisamente aos seus requisitos e restrições específicos, otimizando potencialmente o desempenho, a utilização de recursos ou outros fatores relevantes.
Importante
Ao usar estratégias personalizadas, você é responsável por garantir que as operações de I/O funcionem corretamente.