파티션 전체에서 사용자 지정 논리로 데이터 처리하기

분산 파티션 함수(DPF)를 사용하여 컴퓨팅 풀에 있는 하나 이상의 노드에서 데이터를 병렬로 처리할 수 있습니다. 여기에서는 분산 오케스트레이션, 오류, 가시성, 아티팩트 지속성을 자동으로 처리합니다.

DPF는 지정된 열을 기준으로 Snowpark DataFrame를 분할하고 각 파티션에서 Python 함수를 병렬로 실행합니다. DPF가 인프라 복잡성을 처리하고 자동으로 확장하는 동안 처리 논리에 집중하세요.

DPF를 사용하면 다양한 데이터 세그먼트에서 대규모 데이터 세트를 효율적으로 처리할 수 있습니다. 이 도구는 리전별로 판매 데이터를 분석하거나, 지리적 세그먼트별로 고객 데이터를 처리하거나, 각 데이터 파티션에 동일한 처리 논리가 필요한 데이터를 변환하는 등의 시나리오에 이상적입니다. DPF는 분산 데이터 처리 작업을 자동으로 처리하여 분산 컴퓨팅 인프라 관리의 복잡성을 없애 줍니다.

DPF를 사용하면 GPU에 액세스할 수 있는 컨테이너화된 인프라에서 오픈 소스 라이브러리를 통해 사용자 지정 Python 코드를 작성할 수 있습니다.

중요

DPF를 사용하기 전에 다음을 갖추고 있는지 확인하세요.

  • Container Runtime 환경: DPF를 사용하려면 Snowflake ML Container Runtime 환경이 필요합니다.

  • 스테이지 액세스 허가: DPF가 결과와 아티팩트를 Snowflake 스테이지에 자동으로 저장합니다. 지정된 스테이지에 액세스할 수 있는 적절한 권한이 있는지 확인하세요.

다음 섹션에서는 가상의 판매 데이터 세트에서 DPF를 사용하는 방법을 설명합니다.

리전별로 판매 데이터 처리하기

다음 예제에서는 DPF를 사용하여 리전별 판매 데이터를 병렬로 처리하는 방법을 보여줍니다.

  1. 처리 함수 정의 - 각 데이터 파티션(리전)에 적용할 변환 논리를 지정하는 Python 함수를 만듭니다.

  2. DPF을 사용하여 데이터 처리 - 처리 함수로 DPF를 초기화하고 모든 파티션에서 동시에 실행합니다.

  3. 진행 상황 모니터링 및 완료 대기 - 처리 상태를 추적하고 모든 파티션의 실행이 완료될 때까지 기다립니다.

  4. 각 파티션에서 결과 검색 - 성공적으로 완료되면 모든 리전에서 처리된 결과를 수집하고 결합합니다.

  5. 선택 사항: 나중에 결과 복원 - 전체 프로세스를 다시 실행하지 않고도 이전에 완료된 결과에 액세스합니다.

판매 데이터 세트가 있는 경우 리전 데이터를 데이터 프레임의 열로 포함할 수 있습니다. 각 리전의 데이터에는 처리 논리를 적용할 수 있습니다. 분산 파티션 함수(DPF)를 사용하면 다양한 리전의 판매 데이터를 병렬로 처리할 수 있습니다.

또한 다음 열이 포함된 ``sales_data``라는 데이터 프레임을 포함할 수 있습니다.

  • region: ‘북부’, ‘남부’, ‘동부’, ‘서부’

  • customer_id: 고유한 고객 식별자

  • amount: 트랜잭션 금액

  • order_date: 트랜잭션 날짜

처리 함수 정의하기

다음 코드는 리전별로 판매 데이터를 처리하는 처리 함수를 정의합니다. 각 데이터 파티션에 적용되는 변환 논리를 지정하고 스테이지에 결과를 저장합니다.

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus

# Define function to process each region's data
Copy
def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access
    import json
    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

process_sales_data 함수는 분산 처리를 활성화하는 두 가지 주요 인자를 수신합니다.

  • data_connector: sales_data DataFrame에 대한 액세스를 제공하는 데 사용됩니다.

  • context: 결과를 스테이지에 쓰는 데 사용됩니다.

각 리전에 대해 함수가 다음 열을 생성합니다.

  • total_sales

  • avg_order_value

  • customer_count

  • record_count

그런 다음 결과를 ``sales_summary.json``이라는 JSON 파일로 스테이지에 결과를 씁니다.

DPF를 사용하여 함수로 데이터를 처리하기

처리 함수를 만든 후에는 다음 코드를 사용하여 DPF 함수가 포함된 파티션에서 데이터를 병렬로 처리할 수 있습니다.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",  # Creates separate partitions for North, South, East, West
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

다음 코드를 사용하면 여러 노드에서 DPF를 실행할 수 있습니다.

from snowflake.ml.runtime_cluster import scale_cluster

# Scale cluster before running tuner
scale_cluster(2)  # Scale to 2 nodes for parallel trials
Copy

진행 상황 모니터링 및 완료 대기

다음 코드를 사용하여 DPF 실행 진행 상황을 모니터링할 수 있습니다.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

다음은 코드의 출력입니다.

Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS

각 파티션에서 결과 검색하기

실행이 성공적으로 완료되면 각 파티션에서 결과를 검색할 수 있습니다. 다음 코드는 리전별 판매 결과를 가져옵니다.

if final_status == RunStatus.SUCCESS:
    # Get results from each region
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
else:
    # Handle failures - check logs for failed partitions
    for partition_id, details in run.partition_details.items():
        if details.status != "DONE":
            error_logs = details.logs
Copy

선택 사항: 완료된 실행에서 결과 복원하기

스테이지에서 완료된 실행을 복원하고, 프로세스를 다시 실행하지 않고도 동일한 결과에 액세스할 수 있습니다. 다음 코드는 이 작업을 수행하는 방법을 보여줍니다.

# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
    DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Copy

다음 단계

  • DPF를 기본 인프라로 사용해 여러 ML 모델을 학습시키는 방법에 대해 알아보려면 :doc:`train-models-across-partitions`를 살펴보세요.

  • 고급 사용 패턴, 오류 처리 전략, 성능 최적화 기법은 Snowflake ML Python 패키지의 전체 API 설명서를 참조하세요.