파티션 전체에서 사용자 지정 논리로 데이터 처리하기

분산 파티션 함수(DPF)를 사용하여 컴퓨팅 풀에 있는 하나 이상의 노드에서 데이터를 병렬로 처리할 수 있습니다. DPF는 분산 오케스트레이션, 오류, 가시성, 아티팩트 지속성을 자동으로 처리합니다. Snowflake 노트북 또는 :doc:`Snowflake ML 작업 </developer-guide/snowflake-ml/ml-jobs/overview>`에서 DPF를 실행할 수 있습니다.

DPF는 다음 실행 모드를 지원합니다.

  • **DataFrame 모드**(run()): 열 값으로 Snowpark DataFrame을 분할하고 각 파티션에서 함수를 동시에 실행합니다. 데이터는 최적의 처리량을 위해 병렬로 프리페치됩니다.

  • **스테이지 모드**(run_from_stage()): 각 파일이 분할되는 Snowflake 스테이지에서 파일을 처리합니다. 메모리 사용량을 예측할 수 있는 대규모 파일 처리에 적합합니다.

DPF를 사용하면 다양한 데이터 세그먼트에서 대규모 데이터 세트를 효율적으로 처리할 수 있습니다.

이 도구는 다음과 같은 시나리오에 적합합니다.

  • 리전별 판매 데이터 분석하기

  • 지리적 세그먼트별로 고객 데이터 처리하기

  • 각 데이터 파티션의 ML 모델 학습시키기

  • 각 데이터 파티션에 동일한 처리 논리가 필요한 데이터 변환 수행하기

DPF는 분산 데이터 처리를 자동으로 처리합니다. 분산 컴퓨팅 인프라를 관리할 필요가 없습니다.

DPF를 사용하면 GPU에 액세스할 수 있는 컨테이너화된 인프라에서 오픈 소스 라이브러리를 통해 사용자 지정 Python 코드를 작성할 수 있습니다.

중요

DPF는 결과와 아티팩트를 Snowflake 스테이지에 자동으로 저장합니다. DPF를 사용하기 전에 DPF가 결과 및 아티팩트를 저장하는 스테이지에 대한 권한이 있는지 확인합니다.

DataFrame 모드: 열 파티션별로 데이터 처리

DataFrame 모드를 사용하여 지정된 열을 기준으로 Snowpark DataFrame을 분할하고 각 파티션에서 Python 함수를 병렬로 실행합니다. 다음 예제에서는 리전별 판매 데이터를 처리하는 방법을 보여줍니다.

  1. 처리 함수 정의

  2. DPF 초기화 및 실행

  3. 진행 상황 모니터링 및 완료 대기

  4. 각 파티션에서 결과 검색

  5. 오류 처리

  6. 완료된 실행에서 결과 복원

처리 함수 정의하기

분산 처리를 실행하는 데 필요한 클래스를 가져옵니다.

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, ExecutionOptions
)
Copy

두 개의 인자를 받는 처리 함수를 정의합니다.

  • data_connector: 파티션의 데이터에 대한 액세스를 제공하는 DataConnector </developer-guide/snowpark-ml/reference/latest/api/data/snowflake.ml.data.data_connector.DataConnector>`_입니다. ``data_connector.to_pandas()``를 호출하여 pandas DataFrame으로 로드하거나 ``to_torch_dataset()` 또는 ``to_ray_dataset()``와 같은 다른 메서드를 사용합니다.

  • context: 파티션 ID와 아티팩트 업로드 및 다운로드 메서드를 제공하는 PartitionContext 오브젝트입니다.

import json

def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access

    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

각 리전에 대해 이 함수는 요약 통계를 계산하고 결과를 파티션의 전용 스테이지 디렉터리에 JSON 파일로 저장합니다.

DPF 초기화 및 실행

처리 함수와 출력 스테이지 이름으로 DPF 인스턴스를 생성한 다음, ``run()``을 호출하여 분산 처리를 시작합니다.

중요

사용자가 제공하는 Snowpark DataFrame은 테이블에서 생성해야 합니다. 테이블에서 DataFrame 생성에 대한 자세한 내용은 DataFrame 구성하기 섹션을 참조하세요.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

run() 메서드는 다음 매개 변수를 허용합니다.

  • ``partition_by``(문자열): DataFrame을 분할할 열 이름입니다. 각 고유 값은 별도의 파티션을 생성합니다.

  • snowpark_dataframe: 분할하고 처리할 Snowpark DataFrame입니다.

  • run_id``(문자열): 실행에 대한 고유 식별자입니다. 모든 아티팩트에 대한 전용 디렉터리 ``@{stage_name}/{run_id}/``를 생성합니다. ``experiment_2024_01_15 또는 ``model_v1_retrain``과 같이 설명이 포함된 이름을 사용합니다.

  • ``on_existing_artifacts``(문자열, 선택 사항): ``run_id``에 대한 아티팩트가 이미 있는 경우 수행할 작업입니다. ``”error”``(기본값)는 오류를 발생시키며 ``”overwrite”``는 기존 아티팩트를 대체합니다.

  • ``execution_options``(ExecutionOptions, 선택 사항): 리소스 할당 및 실행 동작을 위한 구성입니다.

진행 상황 모니터링 및 완료 대기

``wait()``를 호출하여 실행이 완료될 때까지 차단합니다. 기본적으로 진행률 표시줄이 표시됩니다.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

다음은 출력의 예입니다.

Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS

차단하지 않고 언제든지 상태와 진행 상황을 확인할 수도 있습니다.

# Check overall status
current_status = run.status

# Get progress grouped by partition status
progress = run.get_progress()
Copy

각 파티션에서 결과 검색하기

실행이 완료되면 partition_details 속성을 사용하여 각 파티션에서 결과를 검색합니다. 각 파티션의 세부 정보에는 저장된 아티팩트에 액세스하기 위한 ``stage_artifacts_manager``가 포함되어 있습니다.

if final_status == RunStatus.SUCCESS:
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        # List available artifacts for this partition
        files = details.stage_artifacts_manager.list()
        print(f"Partition {partition_id} artifacts: {files}")

        # Load an artifact using a custom deserializer
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
Copy

``stage_artifacts_manager``는 다음의 3가지 메서드를 제공합니다.

  • list(): 파티션의 스테이지 디렉터리에 저장된 파일 이름 목록을 반환합니다.

  • get(filename, read_function=None): 아티팩트를 다운로드하고 역직렬화합니다. 기본적으로 ``pickle``을 사용하거나 제공된 경우 사용자 지정 ``read_function``을 사용합니다.

  • download(filename, local_dir): 원시 파일을 로컬 디렉터리에 다운로드하고 로컬 파일 경로를 반환합니다.

오류 처리

실행에 실패하는 경우 개별 파티션 세부 정보를 검사하여 실패를 진단합니다.

if final_status != RunStatus.SUCCESS:
    for partition_id, details in run.partition_details.items():
        if details.status != PartitionStatus.DONE:
            print(f"Partition {partition_id} status: {details.status}")
            try:
                error_logs = details.logs
                print(error_logs)
            except RuntimeError:
                print("Logs not available for this partition")
Copy

전체 ``RunStatus``는 집계 결과를 반영합니다.

  • SUCCESS: 모든 파티션이 성공적으로 완료되었습니다.

  • PARTIAL_FAILURE: 일부 파티션이 성공했지만 하나 이상의 파티션이 실패했습니다.

  • FAILURE: 성공적으로 완료된 파티션이 없습니다.

  • CANCELLED: 실행이 취소되었습니다.

  • IN_PROGRESS: 실행이 여전히 실행 중입니다.

각 파티션에는 다음의 ``PartitionStatus``가 있습니다.

  • PENDING: 처리 대기 중입니다.

  • RUNNING: 현재 처리 중입니다.

  • DONE: 성공적으로 완료되었습니다.

  • FAILED: 사용자 함수에서 예외가 발생했습니다.

  • CANCELLED: 사용자가 취소했습니다.

  • INTERNAL_ERROR: 내부 시스템 오류가 발생했습니다(예: 메모리 부족).

이러한 열거형을 가져오려면 다음을 수행합니다.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, PartitionStatus
)
Copy

실행 중인 작업을 취소하려면 다음을 수행합니다.

run.cancel()
Copy

참고

이미 완료된 파티션은 취소를 해도 영향을 받지 않습니다. 완료된 파티션의 부분 결과, 로그 및 아티팩트는 계속 사용할 수 있습니다.

완료된 실행에서 결과 복원

완료된 실행을 유지 상태에서 복원하고 프로세스를 다시 실행하지 않고도 동일한 결과에 액세스할 수 있습니다.

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun

restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")

# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
    print(f"{partition_id}: {details.status}")
Copy

참고

복원된 실행은 읽기 전용입니다. 복원된 실행에는 wait() 또는 ``cancel()``을 호출할 수 없습니다.

스테이지 모드: 스테이지에서 파일 처리

스테이지 모드를 사용하여 각 파일이 파티션이 되는 Snowflake 스테이지에서 파일을 처리합니다. 이는 스테이징된 Parquet 파일 모음 처리와 같은 대규모 파일 처리에 적합합니다.

처리 함수 정의

처리 함수 서명은 DataFrame 모드와 동일합니다. ``data_connector``는 파일의 데이터에 대한 액세스를 제공하며, ``context.partition_id``는 스테이지 내 상대 파일 경로입니다.

def process_file(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing file {context.partition_id}: {len(df)} rows")

    # Process data and save results
    result = {"row_count": len(df), "columns": list(df.columns)}
    import json
    context.upload_to_stage(result, "result.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

스테이지에서 DPF 실행

run() 대신 run_from_stage()``를 호출합니다. 소스 파일이 포함된 입력 ``stage_location 및 선택적으로 처리할 파일을 필터링하는 ``file_pattern``을 지정합니다.

dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
    stage_location="@my_db.my_schema.input_stage/data/",
    run_id="file_processing_2024",
    file_pattern="*.parquet",
)
final_status = run.wait()
Copy

run_from_stage() 메서드는 다음 매개 변수를 허용합니다.

  • ``stage_location``(문자열): 소스 데이터 파일이 포함된 입력 스테이지 경로입니다. ``file_pattern``과 일치하는 각 파일은 파티션이 됩니다. 단순 스테이지 이름과 정규화된 스테이지 이름을 모두 지원합니다.

    • 단순: "@my_stage/data/"

    • 정규화된 이름*: "@my_db.my_schema.my_stage/data/"

  • ``run_id``(문자열): 이 실행에 대한 고유 식별자입니다.

  • file_pattern``(문자열, 선택 사항): 파일을 필터링하는 Glob 패턴입니다. 기본값은 ``"*.parquet" 입니다.

  • ``on_existing_artifacts``(문자열, 선택 사항): ``”error”``(기본값) 또는 ``”overwrite”``입니다.

  • ``execution_options``(ExecutionOptions, 선택 사항): 리소스 할당 및 실행 동작을 위한 구성입니다.

참고

``stage_location``은 입력 데이터 소스입니다. ``DPF()``에 제공된 ``stage_name``은 로그 및 결과와 같은 아티팩트의 출력 위치입니다. 이러한 스테이지는 서로 다른 스테이지일 수 있습니다.

모니터링, 결과 검색, 오류 처리 및 실행 복원은 :ref:`DataFrame 모드 <label-dpf-df-monitor>`에서와 동일한 방식으로 작동합니다.

I/O 바운드 파일 처리의 경우 병렬 처리를 최대화하기 위해 ``ExecutionOptions``에서 ``num_cpus_per_worker=1``을 설정합니다(CPU당 하나의 액터). CPU바인딩 워크로드의 경우 기본값을 사용하거나 ``num_cpus_per_worker``를 높입니다.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

run = dpf.run_from_stage(
    stage_location="@my_stage/data/",
    run_id="io_bound_processing",
    execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Copy

실행 옵션 구성

``ExecutionOptions``를 사용하여 워커당 CPU/GPU 할당, 재시도 횟수, Fail-fast 동작과 같은 리소스 할당 및 실행 동작을 제어합니다. 모든 필드는 선택 사항이며 적절한 기본값이 있습니다.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

options = ExecutionOptions(
    num_cpus_per_worker=4,
    num_gpus_per_worker=1,
    fail_fast=True,
)

run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="my_run",
    execution_options=options,
)
Copy

매개 변수 및 워커 크기 조정 동작의 전체 목록은 `ExecutionOptions API 참조 <https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/container-runtime/distributors.distributed_partition_function>`_를 참조하세요.

PartitionContext를 사용하여 아티팩트 작업

PartitionContext 오브젝트는 처리 함수에 두 번째 인자로 전달됩니다. 파티션 실행 중에 아티팩트 및 Snowflake 세션과 상호 작용하는 메서드를 제공합니다.

아티팩트 업로드

``upload_to_stage()``를 사용하여 처리 함수 내에서 결과를 저장합니다. 기본적으로 오브젝트는 피클을 사용하여 직렬화됩니다. 사용자 지정 직렬화의 경우 ``write_function``을 제공합니다.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Save a pickle object (default serialization)
    results = {'total': df['amount'].sum(), 'count': len(df)}
    context.upload_to_stage(results, "summary.pkl")

    # Save JSON data with a custom write function
    import json
    context.upload_to_stage(
        results, "summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
    )

    # Save a CSV file
    df_processed = df.groupby('category').sum()
    context.upload_to_stage(
        df_processed, "aggregated.csv",
        write_function=lambda df, path: df.to_csv(path, index=False)
    )
Copy

아티팩트 다운로드

``download_from_stage()``를 사용하여 처리 함수 내에서 아티팩트를 로드합니다. 이 함수를 사용하여 이전 실행의 아티팩트에 액세스할 수 있습니다. 예를 들어, 추론을 위해 모델을 로드하는 데 사용할 수 있습니다.

def my_inference_function(data_connector, context):
    # Load a pickle object from the current partition's stage path
    model = context.download_from_stage("model.pkl")

    # Load from a different stage path (e.g., from a prior training run)
    model = context.download_from_stage(
        "model.pkl",
        stage_path="@db.schema.stage/training_run/partition_0"
    )

    # Load JSON with a custom deserializer
    import json
    config = context.download_from_stage(
        "config.json",
        read_function=lambda path: json.load(open(path, 'r'))
    )
Copy

Snowflake 세션 사용

``with_session()``을 사용하여 테이블에 결과 쓰기와 같이 Snowflake 세션이 필요한 작업을 실행합니다. 이 메서드는 제한된 세션 풀을 사용하여 많은 파티션을 동시에 실행할 때 Snowflake의 세션 제한에 도달하는 것을 방지합니다.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Load a model from a prior training run
    model = context.download_from_stage("model.pkl")

    predictions = model.predict(df[['X1', 'X2']])

    results = df.copy()
    results['predictions'] = predictions
    results['partition_id'] = context.partition_id

    # Write results to a Snowflake table using the bounded session pool
    context.with_session(lambda session:
        session.create_dataframe(results)
            .write.mode("append")
            .save_as_table("predictions_table")
    )
Copy

참고

``with_session()``에 전달된 함수는 cloudpickle을 사용하여 직렬화됩니다. 클로저에서 큰 오브젝트나 직렬화할 수 없는 리소스를 캡처하지 마세요.

여러 노드에 걸쳐 확장

여러 노드에 걸쳐 DPF를 실행하려면 먼저 클러스터를 확장합니다.

from snowflake.ml.runtime_cluster import scale_cluster

# Scale to 3 nodes for increased parallelism
scale_cluster(3)

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="multi_node_run",
    execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Copy

여러 노드에서 실행할 때 헤드 노드가 사용자 함수를 실행하지 않고 조정자 역할만 하도록 하려는 경우 ``use_head_node=False``를 설정합니다. 이를 통해 워커 노드의 메모리 부족 오류가 조정자에게 영향을 주지 않으므로 장기 실행 워크로드의 안정성을 개선할 수 있습니다.

제한 사항 및 제약 조건

  • 한 번의 동시 실행: DPF는 한 번에 하나만 실행할 수 있습니다. 다른 실행이 진행 중인 동안 새 실행을 시작하면 오류가 발생합니다. 새 실행을 시작하기 전에 ``run.cancel()``로 이전 실행을 취소합니다.

  • DataFrame 요구 사항: DataFrame 모드에서 Snowpark DataFrame에는 정확히 하나의 쿼리가 포함되어야 하며 사후 작업이 없어야 합니다.

  • 단일 노드 제한: ``use_head_node=False``는 단일 노드 클러스터에서 지원되지 않습니다.

  • 아티팩트 경로 구조: 아티팩트는 ``@{stage_name}/{run_id}/{partition_id}/``에 저장됩니다. 이 경로 구조는 고정되어 있으며 사용자 지정할 수 없습니다.

  • 복원된 실행은 읽기 전용임: DPFRun.restore_from()``으로 복원된 실행은 ``wait() 또는 ``cancel()``을 호출할 수 없습니다.

다음 단계