Databricksチェックポイント

Snowparkチェックポイントは、収集した結果に関するファイルを書き込み、同じファイルを読み込んで DataFrames を検証します。これらのファイルの一部は PySpark を使って生成され、その他は osglob のようなPythonパッケージを使って生成されます。このタイプのファイル操作の動作は、ファイルシステムが従来の環境と異なるDatabricks環境で不整合を引き起こす可能性があります。そのため、ファイルの読み書きが正しく行われるようにパッケージを適合させる必要があります。

次のセクションでは、SnowparkチェックポイントをDatabricks環境でシームレスに動作するように構成する方法を示します。これにより、効率的な DataFrame 検証が可能になります。

前提条件

SnowparkチェックポイントをDatabricksで使用する前に、お使いの環境が以下の要件を満たしていることを確認してください。

  • PySpark: バージョン3.5.0またはそれ以上。

  • Python: バージョン3.9、3.10、3.11

これらの要件を満たすDatabricks Runtimeのバージョンは以下の通りです。

  • Databricks Runtime 14.3 LTS

  • Databricks Runtime 15.4 LTS

入力/出力(I/O)戦略

Snowparkチェックポイントが様々な環境で正しく動作するように、ファイルの読み取りと書き込みの操作にインターフェイス EnvStrategy とその実装クラスを使用することができます。これにより、I/O操作を適応させ、カスタマイズすることができます。

  • Snowparkチェックポイントでは、 EnvStrategy インターフェイスを実装したクラスを作成することで、独自のカスタム入力/出力メソッドを実装することができます。そうすれば、特定の実行環境や期待に合わせて操作を調整することができます。

  • 内部的には、パッケージは、 EnvStrategy インターフェイスを実装し、I/O操作の基本的な実装を提供するデフォルトクラス(IODefaultStrategy)を使用します。このデフォルトの戦略を、環境固有のニーズに適したカスタム実装に置き換えることができます。

重要

Snowparkチェックポイントの各パッケージ(snowpark-checkpoints-collectorssnowpark-checkpoints-validatorssnowpark-checkpoints-hypothesis)には、ファイル処理クラスの独自のコピーが含まれています。そのため、ファイル構成の変更は各パッケージに個別に適用する必要があります。必ず使用するパッケージから構成をインポートしてください。

I/O関数

これらのファイル読み書きメソッドはカスタマイズ可能です。

  • mkdir: フォルダーを作成します。

  • folder_exists: フォルダーが存在するかどうかをチェックします。

  • file_exists: ファイルが存在するかどうかをチェックします。

  • write: ファイルにコンテンツを書き込みます。

  • read: ファイルからコンテンツを読み込みます。

  • read_bytes: ファイルからバイナリコンテンツを読み込みます。

  • ls: ディレクトリのコンテンツをリストします。

  • getcwd: 現在の作業ディレクトリを取得します。

  • remove_dir: ディレクトリとそのコンテンツを削除します。この関数は snowpark-checkpoints-collectors モジュールでのみ使用されます。

  • telemetry_path_files: 遠隔測定ファイルのパスを取得します。

Databricks戦略

Databricks戦略は、 DBFS ファイルパスの扱い方を知っている構成です。 normalize_dbfs_path 関数を使用して、すべてのパスが /dbfs/ で始まるようにします。

使用方法

Databricks戦略を使用するには、コード内で明示的に構成する必要があります。以下がその方法です。

  1. 必要なクラスをインポートします。

    from typing import Optional, BinaryIO
    from pathlib import Path
    from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy
    from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
    
    Copy
  2. Databricks戦略を定義します。

    class IODatabricksStrategy(EnvStrategy):
    
      def __init__(self):
          self.default_strategy = IODefaultStrategy()
    
      def mkdir(self, path: str, exist_ok: bool = False) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.mkdir(path, exist_ok=exist_ok)
    
      def folder_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.folder_exists(path)
    
      def file_exists(self, path: str) -> bool:
          path = normalize_dbfs_path(path)
          return self.default_strategy.file_exists(path)
    
      def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None:
          file_path = normalize_dbfs_path(file_path)
          self.default_strategy.write(file_path, file_content, overwrite=overwrite)
    
      def read(
          self, file_path: str, mode: str = "r", encoding: Optional[str] = None
      ) -> str:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read(file_path, mode=mode, encoding=encoding)
    
      def read_bytes(self, file_path: str) -> bytes:
          file_path = normalize_dbfs_path(file_path)
          return self.default_strategy.read_bytes(file_path)
    
      def ls(self, path: str, recursive: bool = False) -> list[str]:
          file_path = normalize_dbfs_path(path)
          list_of_files = self.default_strategy.ls(file_path, recursive=recursive)
          return [content.replace("/dbfs","") for content in list_of_files]
    
      def getcwd(self) -> str:
          try:
              parent_folder = "/snowpark_checkpoints"
              self.mkdir(parent_folder, exist_ok=True)
              return parent_folder
          except Exception:
              return ""
    
      def remove_dir(self, path:str) -> None:
          path = normalize_dbfs_path(path)
          self.default_strategy.remove_dir(path)
    
      def telemetry_path_files(self, path:str) -> Path:
          path = normalize_dbfs_path(path)
          return self.default_strategy.telemetry_path_files(path)
    
    def normalize_dbfs_path(path: str) -> str:
        if isinstance(path, Path):
            path = str(path)
        if not path.startswith("/"):
            path = "/" + path
        if not path.startswith("/dbfs/"):
            path = f'/dbfs{path}'
        return path
    
    Copy
  3. Databricks戦略を構成します。

    get_io_file_manager().set_strategy(IODatabricksStrategy())
    
    Copy

このコードをDatabricksスクリプトまたはノートブックの開始時に実行すると、Snowparkチェックポイントが DBFS で定義されたI/O戦略を使用してファイルを正しく処理するように構成されます。

オプションのカスタマイズ

より専門的な入力/出力操作については、カスタム戦略を設計して実装することができます。このアプローチは、I/O動作を完全に制御し、柔軟性を提供します。開発者は、特定の要件や制約に合わせて戦略を正確に調整することができ、パフォーマンスやリソース利用、その他の関連要因を最適化できる可能性があります。

重要

カスタム戦略を使用する場合、I/O操作が正しく機能するようにするのはお客様の責任です。