파티션 전체에서 사용자 지정 논리로 데이터 처리하기¶
분산 파티션 함수(DPF)를 사용하여 컴퓨팅 풀에 있는 하나 이상의 노드에서 데이터를 병렬로 처리할 수 있습니다. DPF는 분산 오케스트레이션, 오류, 가시성, 아티팩트 지속성을 자동으로 처리합니다. Snowflake 노트북 또는 :doc:`Snowflake ML 작업 </developer-guide/snowflake-ml/ml-jobs/overview>`에서 DPF를 실행할 수 있습니다.
DPF는 다음 실행 모드를 지원합니다.
**DataFrame 모드**(
run()): 열 값으로 Snowpark DataFrame을 분할하고 각 파티션에서 함수를 동시에 실행합니다. 데이터는 최적의 처리량을 위해 병렬로 프리페치됩니다.**스테이지 모드**(
run_from_stage()): 각 파일이 분할되는 Snowflake 스테이지에서 파일을 처리합니다. 메모리 사용량을 예측할 수 있는 대규모 파일 처리에 적합합니다.
DPF를 사용하면 다양한 데이터 세그먼트에서 대규모 데이터 세트를 효율적으로 처리할 수 있습니다.
이 도구는 다음과 같은 시나리오에 적합합니다.
리전별 판매 데이터 분석하기
지리적 세그먼트별로 고객 데이터 처리하기
각 데이터 파티션의 ML 모델 학습시키기
각 데이터 파티션에 동일한 처리 논리가 필요한 데이터 변환 수행하기
DPF는 분산 데이터 처리를 자동으로 처리합니다. 분산 컴퓨팅 인프라를 관리할 필요가 없습니다.
DPF를 사용하면 GPU에 액세스할 수 있는 컨테이너화된 인프라에서 오픈 소스 라이브러리를 통해 사용자 지정 Python 코드를 작성할 수 있습니다.
중요
DPF는 결과와 아티팩트를 Snowflake 스테이지에 자동으로 저장합니다. DPF를 사용하기 전에 DPF가 결과 및 아티팩트를 저장하는 스테이지에 대한 권한이 있는지 확인합니다.
DataFrame 모드: 열 파티션별로 데이터 처리¶
DataFrame 모드를 사용하여 지정된 열을 기준으로 Snowpark DataFrame을 분할하고 각 파티션에서 Python 함수를 병렬로 실행합니다. 다음 예제에서는 리전별 판매 데이터를 처리하는 방법을 보여줍니다.
처리 함수 정의하기¶
분산 처리를 실행하는 데 필요한 클래스를 가져옵니다.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, ExecutionOptions
)
두 개의 인자를 받는 처리 함수를 정의합니다.
data_connector: 파티션의 데이터에 대한 액세스를 제공하는DataConnector </developer-guide/snowpark-ml/reference/latest/api/data/snowflake.ml.data.data_connector.DataConnector>`_입니다. ``data_connector.to_pandas()``를 호출하여 pandas DataFrame으로 로드하거나 ``to_torch_dataset()`또는 ``to_ray_dataset()``와 같은 다른 메서드를 사용합니다.context: 파티션 ID와 아티팩트 업로드 및 다운로드 메서드를 제공하는 PartitionContext 오브젝트입니다.
import json
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
각 리전에 대해 이 함수는 요약 통계를 계산하고 결과를 파티션의 전용 스테이지 디렉터리에 JSON 파일로 저장합니다.
DPF 초기화 및 실행¶
처리 함수와 출력 스테이지 이름으로 DPF 인스턴스를 생성한 다음, ``run()``을 호출하여 분산 처리를 시작합니다.
중요
사용자가 제공하는 Snowpark DataFrame은 테이블에서 생성해야 합니다. 테이블에서 DataFrame 생성에 대한 자세한 내용은 DataFrame 구성하기 섹션을 참조하세요.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
run() 메서드는 다음 매개 변수를 허용합니다.
``partition_by``(문자열): DataFrame을 분할할 열 이름입니다. 각 고유 값은 별도의 파티션을 생성합니다.
snowpark_dataframe: 분할하고 처리할 Snowpark DataFrame입니다.run_id``(문자열): 이 실행에 대한 고유 식별자입니다. 모든 아티팩트에 대한 전용 디렉터리 ``@{stage_name}/{run_id}/``를 생성합니다. ``experiment_2024_01_15또는 ``model_v1_retrain``과 같이 설명이 포함된 이름을 사용합니다.``on_existing_artifacts``(문자열, 선택 사항): ``run_id``에 대한 아티팩트가 이미 있는 경우 수행할 작업입니다. ``”error”``(기본값)는 오류를 발생시키며 ``”overwrite”``는 기존 아티팩트를 대체합니다.
``execution_options``(ExecutionOptions, 선택 사항): 리소스 할당 및 실행 동작을 위한 구성입니다.
진행 상황 모니터링 및 완료 대기¶
``wait()``를 호출하여 실행이 완료될 때까지 차단합니다. 기본적으로 진행률 표시줄이 표시됩니다.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
다음은 출력의 예입니다.
Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS
차단하지 않고 언제든지 상태와 진행 상황을 확인할 수도 있습니다.
# Check overall status
current_status = run.status
# Get progress grouped by partition status
progress = run.get_progress()
각 파티션에서 결과 검색하기¶
실행이 완료되면 partition_details 속성을 사용하여 각 파티션에서 결과를 검색합니다. 각 파티션의 세부 정보에는 저장된 아티팩트에 액세스하기 위한 ``stage_artifacts_manager``가 포함되어 있습니다.
if final_status == RunStatus.SUCCESS:
import json
all_results = []
for partition_id, details in run.partition_details.items():
# List available artifacts for this partition
files = details.stage_artifacts_manager.list()
print(f"Partition {partition_id} artifacts: {files}")
# Load an artifact using a custom deserializer
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
``stage_artifacts_manager``는 다음의 3가지 메서드를 제공합니다.
오류 처리¶
실행에 실패하는 경우 개별 파티션 세부 정보를 검사하여 실패를 진단합니다.
if final_status != RunStatus.SUCCESS:
for partition_id, details in run.partition_details.items():
if details.status != PartitionStatus.DONE:
print(f"Partition {partition_id} status: {details.status}")
try:
error_logs = details.logs
print(error_logs)
except RuntimeError:
print("Logs not available for this partition")
전체 ``RunStatus``는 집계 결과를 반영합니다.
SUCCESS: 모든 파티션이 성공적으로 완료되었습니다.PARTIAL_FAILURE: 일부 파티션이 성공했지만 하나 이상의 파티션이 실패했습니다.FAILURE: 성공적으로 완료된 파티션이 없습니다.CANCELLED: 실행이 취소되었습니다.IN_PROGRESS: 실행이 여전히 실행 중입니다.
각 파티션에는 다음의 ``PartitionStatus``가 있습니다.
PENDING: 처리 대기 중입니다.RUNNING: 현재 처리 중입니다.DONE: 성공적으로 완료되었습니다.FAILED: 사용자 함수에서 예외가 발생했습니다.CANCELLED: 사용자가 취소했습니다.INTERNAL_ERROR: 내부 시스템 오류가 발생했습니다(예: 메모리 부족).
이러한 열거형을 가져오려면 다음을 수행합니다.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, PartitionStatus
)
실행 중인 작업을 취소하려면 다음을 수행합니다.
run.cancel()
참고
이미 완료된 파티션은 취소를 해도 영향을 받지 않습니다. 완료된 파티션의 부분 결과, 로그 및 아티팩트는 계속 사용할 수 있습니다.
완료된 실행에서 결과 복원¶
완료된 실행을 유지 상태에서 복원하고 프로세스를 다시 실행하지 않고도 동일한 결과에 액세스할 수 있습니다.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")
# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
print(f"{partition_id}: {details.status}")
참고
복원된 실행은 읽기 전용입니다. 복원된 실행에는 wait() 또는 ``cancel()``을 호출할 수 없습니다.
스테이지 모드: 스테이지에서 파일 처리¶
스테이지 모드를 사용하여 각 파일이 파티션이 되는 Snowflake 스테이지에서 파일을 처리합니다. 이는 스테이징된 Parquet 파일 모음 처리와 같은 대규모 파일 처리에 적합합니다.
처리 함수 정의¶
처리 함수 서명은 DataFrame 모드와 동일합니다. ``data_connector``는 파일의 데이터에 대한 액세스를 제공하며, ``context.partition_id``는 스테이지 내 상대 파일 경로입니다.
def process_file(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing file {context.partition_id}: {len(df)} rows")
# Process data and save results
result = {"row_count": len(df), "columns": list(df.columns)}
import json
context.upload_to_stage(result, "result.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
스테이지에서 DPF 실행¶
run() 대신 run_from_stage()``를 호출합니다. 소스 파일이 포함된 입력 ``stage_location 및 선택적으로 처리할 파일을 필터링하는 ``file_pattern``을 지정합니다.
dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
stage_location="@my_db.my_schema.input_stage/data/",
run_id="file_processing_2024",
file_pattern="*.parquet",
)
final_status = run.wait()
run_from_stage() 메서드는 다음 매개 변수를 허용합니다.
``stage_location``(문자열): 소스 데이터 파일이 포함된 입력 스테이지 경로입니다. ``file_pattern``과 일치하는 각 파일은 파티션이 됩니다. 단순 스테이지 이름과 정규화된 스테이지 이름을 모두 지원합니다.
단순:
"@my_stage/data/"정규화된 이름*:
"@my_db.my_schema.my_stage/data/"
``run_id``(문자열): 이 실행에 대한 고유 식별자입니다.
file_pattern``(문자열, 선택 사항): 파일을 필터링하는 Glob 패턴입니다. 기본값은 ``"*.parquet"입니다.``on_existing_artifacts``(문자열, 선택 사항): ``”error”``(기본값) 또는 ``”overwrite”``입니다.
``execution_options``(ExecutionOptions, 선택 사항): 리소스 할당 및 실행 동작을 위한 구성입니다.
참고
``stage_location``은 입력 데이터 소스입니다. ``DPF()``에 제공된 ``stage_name``은 로그 및 결과와 같은 아티팩트의 출력 위치입니다. 이러한 스테이지는 서로 다른 스테이지일 수 있습니다.
모니터링, 결과 검색, 오류 처리 및 실행 복원은 :ref:`DataFrame 모드 <label-dpf-df-monitor>`에서와 동일한 방식으로 작동합니다.
I/O 바운드 파일 처리의 경우 병렬 처리를 최대화하기 위해 ``ExecutionOptions``에서 ``num_cpus_per_worker=1``을 설정합니다(CPU당 하나의 액터). CPU바인딩 워크로드의 경우 기본값을 사용하거나 ``num_cpus_per_worker``를 높입니다.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
run = dpf.run_from_stage(
stage_location="@my_stage/data/",
run_id="io_bound_processing",
execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
실행 옵션 구성¶
``ExecutionOptions``를 사용하여 워커당 CPU/GPU 할당, 재시도 횟수, Fail-fast 동작과 같은 리소스 할당 및 실행 동작을 제어합니다. 모든 필드는 선택 사항이며 적절한 기본값이 있습니다.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
options = ExecutionOptions(
num_cpus_per_worker=4,
num_gpus_per_worker=1,
fail_fast=True,
)
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="my_run",
execution_options=options,
)
매개 변수 및 워커 크기 조정 동작의 전체 목록은 `ExecutionOptions API 참조 <https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/container-runtime/distributors.distributed_partition_function>`_를 참조하세요.
PartitionContext를 사용하여 아티팩트 작업¶
PartitionContext 오브젝트는 처리 함수에 두 번째 인자로 전달됩니다. 파티션 실행 중에 아티팩트 및 Snowflake 세션과 상호 작용하는 메서드를 제공합니다.
아티팩트 업로드¶
``upload_to_stage()``를 사용하여 처리 함수 내에서 결과를 저장합니다. 기본적으로 오브젝트는 피클을 사용하여 직렬화됩니다. 사용자 지정 직렬화의 경우 ``write_function``을 제공합니다.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Save a pickle object (default serialization)
results = {'total': df['amount'].sum(), 'count': len(df)}
context.upload_to_stage(results, "summary.pkl")
# Save JSON data with a custom write function
import json
context.upload_to_stage(
results, "summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
)
# Save a CSV file
df_processed = df.groupby('category').sum()
context.upload_to_stage(
df_processed, "aggregated.csv",
write_function=lambda df, path: df.to_csv(path, index=False)
)
아티팩트 다운로드¶
``download_from_stage()``를 사용하여 처리 함수 내에서 아티팩트를 로드합니다. 이 함수를 사용하여 이전 실행의 아티팩트에 액세스할 수 있습니다. 예를 들어, 추론을 위해 모델을 로드하는 데 사용할 수 있습니다.
def my_inference_function(data_connector, context):
# Load a pickle object from the current partition's stage path
model = context.download_from_stage("model.pkl")
# Load from a different stage path (e.g., from a prior training run)
model = context.download_from_stage(
"model.pkl",
stage_path="@db.schema.stage/training_run/partition_0"
)
# Load JSON with a custom deserializer
import json
config = context.download_from_stage(
"config.json",
read_function=lambda path: json.load(open(path, 'r'))
)
Snowflake 세션 사용¶
``with_session()``을 사용하여 테이블에 결과 쓰기와 같이 Snowflake 세션이 필요한 작업을 실행합니다. 이 메서드는 제한된 세션 풀을 사용하여 많은 파티션을 동시에 실행할 때 Snowflake의 세션 제한에 도달하는 것을 방지합니다.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Load a model from a prior training run
model = context.download_from_stage("model.pkl")
predictions = model.predict(df[['X1', 'X2']])
results = df.copy()
results['predictions'] = predictions
results['partition_id'] = context.partition_id
# Write results to a Snowflake table using the bounded session pool
context.with_session(lambda session:
session.create_dataframe(results)
.write.mode("append")
.save_as_table("predictions_table")
)
참고
``with_session()``에 전달된 함수는 cloudpickle을 사용하여 직렬화됩니다. 클로저에서 큰 오브젝트나 직렬화할 수 없는 리소스를 캡처하지 마세요.
여러 노드에 걸쳐 확장¶
여러 노드에 걸쳐 DPF를 실행하려면 먼저 클러스터를 확장합니다.
from snowflake.ml.runtime_cluster import scale_cluster
# Scale to 3 nodes for increased parallelism
scale_cluster(3)
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="multi_node_run",
execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
여러 노드에서 실행할 때 헤드 노드가 사용자 함수를 실행하지 않고 조정자 역할만 하도록 하려는 경우 ``use_head_node=False``를 설정합니다. 이를 통해 워커 노드의 메모리 부족 오류가 조정자에게 영향을 주지 않으므로 장기 실행 워크로드의 안정성을 개선할 수 있습니다.
제한 사항 및 제약 조건¶
한 번의 동시 실행: DPF는 한 번에 하나만 실행할 수 있습니다. 다른 실행이 진행 중인 동안 새 실행을 시작하면 오류가 발생합니다. 새 실행을 시작하기 전에 ``run.cancel()``로 이전 실행을 취소합니다.
DataFrame 요구 사항: DataFrame 모드에서 Snowpark DataFrame에는 정확히 하나의 쿼리가 포함되어야 하며 사후 작업이 없어야 합니다.
단일 노드 제한: ``use_head_node=False``는 단일 노드 클러스터에서 지원되지 않습니다.
아티팩트 경로 구조: 아티팩트는 ``@{stage_name}/{run_id}/{partition_id}/``에 저장됩니다. 이 경로 구조는 고정되어 있으며 사용자 지정할 수 없습니다.
복원된 실행은 읽기 전용임:
DPFRun.restore_from()``으로 복원된 실행은 ``wait()또는 ``cancel()``을 호출할 수 없습니다.
다음 단계¶
DPF를 기본 인프라로 사용하여 여러 ML 모델을 학습시키는 방법에 대해 알아보려면 :doc:`train-models-across-partitions`를 살펴보세요.
전체 API 설명서의 경우 `분산 파티션 함수(DPF) API 참조 <https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/container-runtime/distributors.distributed_partition_function>`_를 참조하세요.
엔드투엔드 예제는 `Snowflake ML 샘플 노트북 <https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml>`_을 참조하세요.