パーティション全体でカスタムロジックによるデータの処理

分散パーティション関数を使用する(DPF)を使用して、コンピューティングプール内の1つ以上のノードで並行してデータを処理します。分散オーケストレーション、エラー、可観測性、アーティファクトの永続性を自動的に処理します。

指定された列で DPF Snowparkをパーティション化し DataFrame 、各パーティションでPython関数を並行して実行します。DPF がインフラストラクチャの複雑さを処理し、自動的にスケーリングするので、処理ロジックに集中できます。

DPF を使用すると、異なるデータセグメント間で大規模なデータセットを効率的に処理できます。このツールは、地域ごとの販売データの分析、地理的セグメントでの顧客データの処理、各データパーティションが同じ処理ロジックを必要とするデータ変換の実行などのシナリオに最適です。 DPF は、分散データ処理を自動的に処理するため、分散コンピューティングインフラストラクチャの管理が不要になります。

DPF を使用すると、 GPU アクセスを備えたコンテナ化されたインフラストラクチャ上でオープンソースライブラリを使用して、カスタムPythonコードを記述できます。

重要

DPF を使い始める前に、以下をご確認ください。

  • コンテナランタイム環境: DPF には、Snowflake ML コンテナランタイム環境が必要です。

  • ステージアクセス許可: DPF は、結果とアーティファクトをSnowflakeステージに自動的に保管します。指定した名前付きステージにアクセスするための適切な権限を持っていることを確認します。

次のセクションでは、仮想の販売データセットで DPF を使用する手順について説明します。

地域別の販売データの処理

次の例は、 DPF を使用して地域ごとの販売データを並行処理する方法を示しています。

  1. 処理関数の定義 - 各データパーティション(リージョン)に適用する変換ロジックを指定するPython関数を作成します。

  2. **DPF を使用してデータを処理する ** - 処理関数を使用して DPF を初期化し、すべてのパーティションで同時に実行します。

  3. 進行状況をモニターして完了を待機する - 処理ステータスを追跡し、すべてのパーティションの実行が完了するのを待ちます。

  4. 各パーティションから結果を取得する - 正常に完了した後、すべてのリージョンから処理された結果を収集して結合します。

  5. オプション:後で結果を復元する - プロセス全体を再実行することなく、以前に完了した結果にアクセスします。

販売データセットがある場合は、データフレームの列としてリージョンデータがある場合があります。各リージョンのデータに処理ロジックを適用する必要がある場合があります。分散パーティション関数(DPF)を使用して、さまざまなリージョンの販売データを並行処理できます。

次の列を含む sales_data というデータフレームがあるかもしれません。

  • region:「北」「南」「東」「西」

  • customer_id:一意の顧客識別子

  • amount:トランザクション量

  • order_date:トランザクションの日付

処理関数を定義する

次のコードは、リージョンごとに販売データを処理する処理関数を定義します。各データパーティションに適用される変換ロジックを指定し、結果をステージに保存します。

from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus

# Define function to process each region's data
Copy
def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access
    import json
    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

process_sales_data 関数は、分散処理を可能にする2つのキー引数を受け取ります。

  • data_connector:sales_data DataFrame へのアクセスを提供するために使用されます

  • context:結果をステージに書き込むために使用されます

各リージョンに対して、この関数は以下の列を作成します。

  • total_sales

  • avg_order_value

  • customer_count

  • record_count

そして、結果を ``sales_summary.json``という名前の JSON ファイルとしてステージに書き込みます。

DPF を使用して関数でデータを処理する

処理関数を作成したら、次のコードを使用して、DPF 関数でパーティション間でデータを並列処理できます。

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",  # Creates separate partitions for North, South, East, West
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

次のコードを使用して、複数のノードにわたって DPF を実行できます。

from snowflake.ml.runtime_cluster import scale_cluster

# Scale cluster before running tuner
scale_cluster(2)  # Scale to 2 nodes for parallel trials
Copy

進行状況をモニターして完了を待つ

次のコードを使用して、 DPF 実行の進行状況をモニターできます。

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

次はコードの出力です。

Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS

各パーティションから結果を取得する

実行が正常に完了すると、各パーティションから結果を取得するできます。次のコードは、リージョンごとの販売結果を取得します。

if final_status == RunStatus.SUCCESS:
    # Get results from each region
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
else:
    # Handle failures - check logs for failed partitions
    for partition_id, details in run.partition_details.items():
        if details.status != "DONE":
            error_logs = details.logs
Copy

オプション:完了した実行の結果を復元する

ステージから完了した実行を復元し、プロセスを再実行することなく同じ結果にアクセスできます。次のコードは、これを行う方法を示しています。

# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
    DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Copy

次のステップ

  • データパーティション全体でモデルをトレーニングする を確認し基盤インフラストラクチャとして DPF を使用して複数の ML モデルをトレーニングする方法を学びます

  • より高度な使用パターン、エラー処理戦略、およびパフォーマンス最適化の手法については、Snowflake ML Pythonパッケージの完全な API ドキュメントをご参照ください。