Databricks의 체크포인트¶
Snowpark Checkpoints는 수집된 결과에 대한 파일을 작성하고 동일한 파일을 읽어 DataFrames 를 확인합니다. 이러한 파일 중 일부는 PySpark 를 사용하여 생성되며, 다른 파일은 os
또는 glob
와 같은 Python 패키지를 사용하여 생성됩니다. 이러한 유형의 파일 처리 동작은 파일 시스템이 기존 환경과 다른 Databricks 환경에서 불일치로 이어질 수 있습니다. 따라서 올바른 파일 읽기 및 쓰기를 보장하기 위해 패키지를 조정해야 합니다.
다음 섹션에서는 Databricks 환경에서 원활하게 작동하여 효율적인 DataFrame 유효성 검사를 수행할 수 있도록 Snowpark Checkpoints를 구성하는 방법을 설명합니다.
전제 조건¶
Databricks에서 Snowpark Checkpoints를 사용하기 전에 사용 중인 환경이 다음 요구 사항을 충족하는지 확인하십시오.
PySpark:
버전 3.5.0 이상.Python:
버전 3.9, 3.10 및 3.11
이러한 요구 사항을 충족하는 Databricks Runtime 버전은 다음과 같습니다.
Databricks Runtime 14.3 LTS
Databricks Runtime 15.4 LTS
입력/출력(I/O) 전략¶
다양한 환경에서 Snowpark Checkpoints가 올바르게 작동하도록 하기 위해 파일 읽기 및 쓰기 작업을 위한 인터페이스 EnvStrategy
와 구현 클래스를 사용할 수 있습니다. 이를 통해 I/O 작업을 조정하고 사용자 지정할 수 있습니다.
Snowpark Checkpoints를 사용하면
EnvStrategy
인터페이스를 구현하는 클래스를 생성하여 사용자 지정 입력/출력 메서드를 구현할 수 있습니다. 그런 다음 특정 실행 환경과 기대치에 맞게 작업을 맞춤 설정할 수 있습니다.내부적으로 패키지는
EnvStrategy
인터페이스를 구현하고 입출력 작업의 기본 구현을 제공하는 기본 클래스(IODefaultStrategy
)를 사용합니다. 이 기본 전략을 사용자 환경의 특정 요구 사항에 적합한 사용자 지정 구현으로 대체할 수 있습니다.
중요
각 Snowpark Checkpoints 패키지(snowpark-checkpoints-collectors
, snowpark-checkpoints-validators
, snowpark-checkpoints-hypothesis
)에는 파일 처리 클래스의 자체 복사본이 포함되어 있습니다. 따라서 파일 구성에 대한 모든 변경 사항은 각 패키지에 개별적으로 적용해야 합니다. 사용 중인 패키지에서 구성을 가져와야 합니다.
I/O 함수¶
이러한 파일 읽기 및 쓰기 방법은 사용자 지정할 수 있습니다.
mkdir
: 폴더를 생성합니다.folder_exists
: 폴더가 있는지 확인합니다.file_exists
: 파일 존재 여부를 확인합니다.write
: 파일에 내용을 씁니다.read
: 파일에서 내용을 읽습니다.read_bytes
: 파일에서 이진 내용을 읽습니다.ls
: 디렉터리 내용을 나열합니다.getcwd
: 현재 작업 디렉터리를 가져옵니다.remove_dir
: 디렉터리와 그 내용을 제거합니다. 이 함수는snowpark-checkpoints-collectors
모듈에서만 독점적으로 사용됩니다.telemetry_path_files
: 원격 분석 파일의 경로를 가져옵니다.
Databricks 전략¶
Databricks 전략은 DBFS 파일 경로를 사용하는 방법을 알고 있는 구성입니다. 이 전략에서는 normalize_dbfs_path
함수를 사용하여 모든 경로가 /dbfs/
로 시작하도록 합니다.
사용 방법¶
Databricks 전략을 사용하려면 코드에서 명시적으로 구성해야 합니다. 방법은 다음과 같습니다.
필요한 클래스를 가져옵니다.
from typing import Optional, BinaryIO from pathlib import Path from snowflake.snowpark_checkpoints_collector.io_utils import EnvStrategy, IODefaultStrategy from snowflake.snowpark_checkpoints_collector.io_utils.io_file_manager import get_io_file_manager
Databricks 전략을 정의합니다.
class IODatabricksStrategy(EnvStrategy): def __init__(self): self.default_strategy = IODefaultStrategy() def mkdir(self, path: str, exist_ok: bool = False) -> None: path = normalize_dbfs_path(path) self.default_strategy.mkdir(path, exist_ok=exist_ok) def folder_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.folder_exists(path) def file_exists(self, path: str) -> bool: path = normalize_dbfs_path(path) return self.default_strategy.file_exists(path) def write(self, file_path: str, file_content: str, overwrite: bool = True) -> None: file_path = normalize_dbfs_path(file_path) self.default_strategy.write(file_path, file_content, overwrite=overwrite) def read( self, file_path: str, mode: str = "r", encoding: Optional[str] = None ) -> str: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read(file_path, mode=mode, encoding=encoding) def read_bytes(self, file_path: str) -> bytes: file_path = normalize_dbfs_path(file_path) return self.default_strategy.read_bytes(file_path) def ls(self, path: str, recursive: bool = False) -> list[str]: file_path = normalize_dbfs_path(path) list_of_files = self.default_strategy.ls(file_path, recursive=recursive) return [content.replace("/dbfs","") for content in list_of_files] def getcwd(self) -> str: try: parent_folder = "/snowpark_checkpoints" self.mkdir(parent_folder, exist_ok=True) return parent_folder except Exception: return "" def remove_dir(self, path:str) -> None: path = normalize_dbfs_path(path) self.default_strategy.remove_dir(path) def telemetry_path_files(self, path:str) -> Path: path = normalize_dbfs_path(path) return self.default_strategy.telemetry_path_files(path) def normalize_dbfs_path(path: str) -> str: if isinstance(path, Path): path = str(path) if not path.startswith("/"): path = "/" + path if not path.startswith("/dbfs/"): path = f'/dbfs{path}' return path
Databricks 전략을 구성합니다.
get_io_file_manager().set_strategy(IODatabricksStrategy())
Databricks 스크립트 또는 노트북 시작 시 이 코드를 실행하면 DBFS 에서 올바른 파일 처리를 위해 정의된 I/O 전략을 사용하도록 Snowpark Checkpoints가 구성됩니다.
선택적 사용자 지정¶
보다 전문적인 입력/출력 작업을 위해 맞춤형 전략을 설계하고 구현할 수 있습니다. 이 접근 방식은 I/O 동작에 대한 완벽한 제어와 유연성을 제공합니다. 이를 통해 개발자는 특정 요구 사항과 제약 조건에 맞게 전략을 정확하게 조정하여 성능, 리소스 활용도 또는 기타 관련 요소를 최적화할 수 있습니다.
중요
사용자 지정 전략을 사용하는 경우 I/O 작업이 올바르게 기능하는지 확인하는 것은 사용자의 책임입니다.