Checkpoints no Databricks

O Snowpark Checkpoints grava arquivos sobre os resultados coletados e lê esses mesmos arquivos para validar DataFrames. Alguns desses arquivos são gerados usando PySpark; outros usam pacotes Python, como os ou glob. Esse tipo de comportamento de tratamento de arquivos pode levar a inconsistências em um ambiente Databricks, onde o sistema de arquivos é diferente dos ambientes tradicionais. Portanto, você deve adaptar o pacote para garantir a leitura e a gravação corretas dos arquivos.

A seção a seguir demonstra como configurar o Snowpark Checkpoints para que funcione perfeitamente em um ambiente Databricks, permitindo assim uma validação eficiente do DataFrame.

Pré-requisitos

Antes de usar o Snowpark Checkpoints no Databricks, certifique-se de que seu ambiente atenda aos seguintes requisitos:

  • PySpark: versão 3.5.0 ou superior.

  • Python: versão 3.9, 3.10 e 3.11

As versões do Databricks Runtime que atendem a esses requisitos são as seguintes:

  • Databricks Runtime 14.3 LTS

  • Databricks Runtime 15.4 LTS

Estratégias de entrada/saída (I/O)

Para garantir que o Snowpark Checkpoints funcione corretamente em vários ambientes, você pode usar a interface EnvStrategy e suas classes de implementação para operações de leitura e gravação de arquivos. Isso permite que as operações de I/O sejam adaptáveis e personalizáveis.

  • Com o Snowpark Checkpoints, você pode implementar seus próprios métodos personalizados de entrada/saída criando uma classe que implemente a interface EnvStrategy. Assim, você pode adaptar as operações ao seu ambiente de execução e expectativas específicas.

  • Internamente, o pacote usa uma classe padrão (IODefaultStrategy) que implementa a interface EnvStrategy e fornece uma implementação básica das operações de I/O. Você pode substituir essa estratégia padrão por uma implementação personalizada, adequada às necessidades específicas do seu ambiente.

Importante

Cada pacote do Snowpark Checkpoints (snowpark-checkpoints-collectors, snowpark-checkpoints-validators, snowpark-checkpoints-hypothesis) inclui sua própria cópia das classes de tratamento de arquivos. Portanto, qualquer alteração nas configurações de arquivo deve ser aplicada a cada pacote separadamente. Certifique-se de importar a configuração do pacote que você está usando.

Funções de I/O

Esses métodos de leitura e gravação de arquivos podem ser personalizados:

  • mkdir: cria uma pasta.

  • folder_exists: verifica se uma pasta existe.

  • file_exists: verifica se um arquivo existe.

  • write: grava o conteúdo em um arquivo.

  • read: lê o conteúdo de um arquivo.

  • read_bytes: lê o conteúdo binário de um arquivo.

  • ls: lista o conteúdo de um diretório.

  • getcwd: obtém o diretório de trabalho atual.

  • remove_dir: remove um diretório e seu conteúdo. Essa função é usada exclusivamente no módulo snowpark-checkpoints-collectors.

  • telemetry_path_files: obtém o caminho para os arquivos de telemetria.

Estratégia do Databricks

A estratégia do Databricks é uma configuração que sabe como trabalhar com os caminhos de arquivo DBFS. Ela usa a função normalize_dbfs_path para garantir que todos os caminhos comecem com /dbfs/.

Como usar

Para usar a estratégia do Databricks, você deve configurá-la explicitamente no código. Veja como:

  1. Importar as classes necessárias:

    from typing import Optional, BinaryIO
    from pathlib import Path
    from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy
    from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
    
    Copy
  2. Definir a estratégia do Databricks:

    class IODatabricksStrategy(EnvStrategy):
    
      def __init__(self):
          self.default_strategy = IODefaultStrategy()
    
      def mkdir(self, path: str, exist_ok: bool = False) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.mkdir(path, exist_ok=exist_ok)
    
      def folder_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.folder_exists(path)
    
      def file_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.file_exists(path)
    
      def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None:
          file_path = normalize_dbfs_path(file_path)
          self.default_strategy.write(file_path, file_content, overwrite=overwrite)
    
      def read(
          self, file_path: str, mode: str = "r", encoding: Optional[str] = None
      ) -> str:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read(file_path, mode=mode, encoding=encoding)
    
      def read_bytes(self, file_path: str) -> bytes:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read_bytes(file_path)
    
      def ls(self, path: str, recursive: bool = False) -> list[str]:
          file_path = normalize_dbfs_path(path)
          list_of_files = self.default_strategy.ls(file_path, recursive=recursive)
          return [content.replace("/dbfs","") for content in list_of_files]
    
      def getcwd(self) -> str:
          try:
              parent_folder = "/snowpark_checkpoints"
              self.mkdir(parent_folder, exist_ok=True)
              return parent_folder
          except Exception:
              return ""
    
      def remove_dir(self, path:str) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.remove_dir(path)
    
      def telemetry_path_files(self, path:str) -> Path:
          path = normalize_dbfs_path(path)
          return self.default_strategy.telemetry_path_files(path)
    
    def normalize_dbfs_path(path: str) -> str:
        if isinstance(path, Path):
            path = str(path)
        if not path.startswith("/"):
            path = "/" + path
        if not path.startswith("/dbfs/"):
            path = f'/dbfs{path}'
        return path
    
    Copy
  3. Configurar a estratégia do Databricks:

    get_io_file_manager().set_strategy(IODatabricksStrategy())
    
    Copy

A execução desse código no início do script Databricks ou notebook configura o Snowpark Checkpoints para usar a estratégia de I/O definida para o tratamento correto dos arquivos em DBFS.

Personalização opcional

Para operações de entrada/saída mais especializadas, uma estratégia personalizada pode ser desenvolvida e implementada. Essa abordagem oferece total controle e flexibilidade sobre o comportamento de I/O. Ela permite que os desenvolvedores adaptem a estratégia precisamente aos seus requisitos e restrições específicos, otimizando potencialmente o desempenho, a utilização de recursos ou outros fatores relevantes.

Importante

Ao usar estratégias personalizadas, você é responsável por garantir que as operações de I/O funcionem corretamente.