パーティション全体でカスタムロジックによるデータの処理

分散パーティション関数( DPF )を使用して、コンピューティングプール内の1つ以上のノードで並行してデータを処理します。 DPF は、分散オーケストレーション、エラー、可観測性、アーティファクトの永続性を自動的に処理します。Snowflakeノートブック または Snowflake ML ジョブ のいずれかでDPF を実行できます。

DPF は以下の実行モードをサポートしています。

  • **DataFrame モード ** ( run() ):列の値によってSnowpark DataFrame をパーティション化し、各パーティションで同時に関数を実行します。データは、最適なスループットのために並列にプリフェッチされます。

  • ** ステージモード ** ( run_from_stage() ):各ファイルがパーティションになるSnowflakeステージからファイルを処理します。予測可能なメモリ使用量の大規模なファイル処理に最適です。

DPF を使用すると、異なるデータセグメント間で大規模なデータセットを効率的に処理できます。

このツールは、次のようなシナリオに最適です。

  • 地域別の販売データの分析

  • 地理的セグメントによる顧客データの処理

  • 各データパーティションの ML モデルのトレーニング

  • 各データパーティションが同じ処理ロジックを必要とするデータ変換の実行

DPF は分散データ処理を自動的に処理します。分散コンピューティングインフラストラクチャを管理する必要はありません。

DPF を使用すると、 GPU アクセスを備えたコンテナ化されたインフラストラクチャ上でオープンソースライブラリを使用して、カスタムPythonコードを記述できます。

重要

DPF は、結果とアーティファクトをSnowflakeステージに自動的に保管します。DPF を使用する前に、 DPF が結果とアーティファクトを格納するステージに対する権限があることを確認します。

DataFrame モード:列パーティションによるデータの処理

DataFrame モードを使用して、指定された列で Snowpark DataFrame をパーティション化し、各パーティションでPython関数を並行して実行します。次の例は、地域ごとの販売データの処理を示しています。

  1. :ref:` 処理関数を定義する <label-dpf-df-define-function>`

  2. DPF を初期化して実行する

  3. :ref:` 進行状況を監視して完了を待つ <label-dpf-df-monitor>`

  4. :ref:` 各パーティションから結果を取得する <label-dpf-df-retrieve>`

  5. :ref:` エラーを処理する<label-dpf-df-errors>`

  6. :ref:` 完了した実行の結果を復元する <label-dpf-df-restore>`

処理関数を定義する

分散処理の実行に必要なクラスをインポートします。

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, ExecutionOptions
)
Copy

2つの引数を取る処理関数を定義します。

  • data_connector:パーティションのデータへのアクセスを提供する DataConnectordata_connector.to_pandas() を呼び出し、それをpandas DataFrame としてロードするか、 to_torch_dataset() または to_ray_dataset() のような他のメソッドを使用します。

  • context:アーティファクトをアップロードおよびダウンロードするためのパーティション ID および メソッドを提供する PartitionContext オブジェクト。

import json

def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access

    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

各リージョンについて、この関数は要約統計をコンピューティングし、結果を JSON ファイルとしてパーティション専用のステージディレクトリに保存します。

DPF を初期化して実行する

処理関数と出力ステージ名を含む DPF インスタンスを作成してから、 run() を呼び出して分散処理を開始します。

重要

提供するSnowpark DataFrame は、テーブルから作成する必要があります。テーブルから DataFrame を作成することについては、 DataFrame の構築 を参照してください。

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

run() メソッドは次のパラメーターを受け入れます。

  • partition_by (str):DataFrame をパーティション化する列名。一意の値ごとに個別のパーティションが作成されます。

  • snowpark_dataframe:パーティション化して処理するSnowpark DataFrame 。

  • run_id (str):これを実行するための一意の識別子。すべてのアーティファクト専用ディレクトリ @{stage_name}/{run_id}/ を作成します。experiment_2024_01_15 または model_v1_retrain のような説明的な名前を使用します。

  • on_existing_artifacts (str、オプション):run_id のアーティファクトが発生した場合のアクションはすでに存在します。 "error" (デフォルト)はエラーを発生します。 "overwrite" は、既存のアーティファクトを置き換えます。

  • execution_optionsExecutionOptions 、オプション):リソースの割り当てと実行動作の構成。

進行状況を監視して完了を待つ

wait() を呼び出して実行が完了するまでブロックします。デフォルトでは、進行状況バーが表示されます。

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

以下は出力の例です。

Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS

また、ブロックせずにいつでもステータスと進捗状況を確認することもできます。

# Check overall status
current_status = run.status

# Get progress grouped by partition status
progress = run.get_progress()
Copy

各パーティションから結果を取得する

実行が正常に完了したら、 partition_details プロパティを使用して各パーティションから結果を取得します。各パーティションの詳細には、保存されたアーティファクトにアクセスするための stage_artifacts_manager が含まれます。

if final_status == RunStatus.SUCCESS:
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        # List available artifacts for this partition
        files = details.stage_artifacts_manager.list()
        print(f"Partition {partition_id} artifacts: {files}")

        # Load an artifact using a custom deserializer
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
Copy

stage_artifacts_manager には3つのメソッドがあります。

  • list():パーティションのステージディレクトリに保存されているファイル名のリストを返します。

  • get(filename, read_function=None):アーティファクトをダウンロードし、逆シリアル化します。デフォルトで pickle を使用するか、提供されている場合はカスタム read_function を使用します。

  • download(filename, local_dir):生ファイルをローカルディレクトリにダウンロードし、ローカルファイルパスを返します。

エラーを処理する

実行が成功しない場合は、個々のパーティションの詳細を検査して障害を診断します。

if final_status != RunStatus.SUCCESS:
    for partition_id, details in run.partition_details.items():
        if details.status != PartitionStatus.DONE:
            print(f"Partition {partition_id} status: {details.status}")
            try:
                error_logs = details.logs
                print(error_logs)
            except RuntimeError:
                print("Logs not available for this partition")
Copy

RunStatus 全体で集計結果を反映します。

  • SUCCESS:すべてのパーティションが正常に完了しました。

  • PARTIAL_FAILURE:一部のパーティションは正常に完了しましたが、少なくとも1つが失敗しました。

  • FAILURE:正常に完了したパーティションがありません。

  • CANCELLED:実行はキャンセルされました。

  • IN_PROGRESS:まだ実行中です。

各パーティションには PartitionStatus があります。

  • PENDING:処理待ち。

  • RUNNING:現在処理中。

  • DONE:正常に完了しました

  • FAILED:ユーザー関数が例外を発生させました。

  • CANCELLED:ユーザーによってキャンセルされました。

  • INTERNAL_ERROR:内部システムエラーが発生しました(例:メモリ不足)。

これらの列挙型をインポートするには:

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, PartitionStatus
)
Copy

実行中のジョブをキャンセルするには:

run.cancel()
Copy

注釈

すでに完了しているパーティションはキャンセルの影響を受けません。完了したパーティションの部分的な結果、ログ、およびアーティファクトは引き続き利用可能です。

完了した実行の結果を復元する

永続化された状態から完了した実行を復元し、プロセスを再実行せずに同じ結果にアクセスできます。

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun

restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")

# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
    print(f"{partition_id}: {details.status}")
Copy

注釈

復元された実行は読み取り専用です。復元された実行で wait() または cancel() を呼び出すことはできません。

ステージモード:ステージからのファイルの処理

ステージモードを使用して、各ファイルがパーティションになるSnowflakeステージからファイルを処理します。これは、ステージングされたParquetファイルのコレクションの処理など、大規模なファイル処理に最適です。

処理関数を定義する

処理関数の署名は、 DataFrame モードと同じです。data_connector はファイルのデータへのアクセスを提供し、 context.partition_id はステージ内の相対ファイルパスです。

def process_file(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing file {context.partition_id}: {len(df)} rows")

    # Process data and save results
    result = {"row_count": len(df), "columns": list(df.columns)}
    import json
    context.upload_to_stage(result, "result.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

ステージから DPF を実行する

run_from_stage()run() の代わりに呼び出します。ソースファイルおよびオプションで file_pattern を含む入力 stage_location を指定し、処理するファイルをフィルターします。

dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
    stage_location="@my_db.my_schema.input_stage/data/",
    run_id="file_processing_2024",
    file_pattern="*.parquet",
)
final_status = run.wait()
Copy

run_from_stage() メソッドは次のパラメーターを受け入れます。

  • stage_location (str):ソースデータファイルを含む入力ステージパス。file_pattern に一致する各ファイルはパーティションになります。単純および完全修飾ステージ名の両方をサポートします。

    • 単純: "@my_stage/data/"

    • 完全修飾: "@my_db.my_schema.my_stage/data/"

  • run_id (str):これを実行するための一意の識別子。

  • file_pattern (str、オプション):ファイルをフィルターするGlobパターン。デフォルトは "*.parquet" です。

  • on_existing_artifacts (str、オプション): "error" (デフォルト)または "overwrite"

  • execution_optionsExecutionOptions 、オプション):リソースの割り当てと実行動作の構成。

注釈

stage_location は * 入力 * データソースです。DPF() に提供される stage_name はログや結果などのアーティファクトの * 出力 * の場所です。これらは異なるステージにすることができます。

モニタリング、結果取得、エラー処理、および実行の復元は DataFrame モード と同じように機能します。

I/Oバウンドのファイル処理の場合、 ExecutionOptionsnum_cpus_per_worker=1 を設定して並列度を最大化します( CPU ごとに1つのアクター)。CPUバウンドのワークロードの場合、デフォルトを使用するか num_cpus_per_worker を増やします。

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

run = dpf.run_from_stage(
    stage_location="@my_stage/data/",
    run_id="io_bound_processing",
    execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Copy

実行オプションを構成する

ExecutionOptions を使用して、ワーカーごとの CPU/GPU 割り当て、再試行回数、フェイルファストの動作などリソースの割り当てや実行動作を制御します。すべてのフィールドは適切なデフォルトを持つオプションです。

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

options = ExecutionOptions(
    num_cpus_per_worker=4,
    num_gpus_per_worker=1,
    fail_fast=True,
)

run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="my_run",
    execution_options=options,
)
Copy

パラメーターの完全なリストとワーカーのスケーリング動作については、 ExecutionOptions API 参照 をご覧ください。

PartitionContext を使用したアーティファクトの操作

PartitionContext オブジェクトは、処理関数の2番目の引数として渡されます。パーティションの実行中にアーティファクトやSnowflakeセッションと対話するためのメソッドを提供します。

アーティファクトをアップロードする

upload_to_stage() を使用して処理関数内から結果を保存します。デフォルトでは、オブジェクトはピクルを使用してシリアル化されます。カスタムシリアル化のための write_function を提供します。

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Save a pickle object (default serialization)
    results = {'total': df['amount'].sum(), 'count': len(df)}
    context.upload_to_stage(results, "summary.pkl")

    # Save JSON data with a custom write function
    import json
    context.upload_to_stage(
        results, "summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
    )

    # Save a CSV file
    df_processed = df.groupby('category').sum()
    context.upload_to_stage(
        df_processed, "aggregated.csv",
        write_function=lambda df, path: df.to_csv(path, index=False)
    )
Copy

アーティファクトをダウンロードする

download_from_stage() を使用して処理関数内のアーティファクトをロードします。この関数を使って、以前の実行のアーティファクトにアクセスすることができます。たとえば、推論のためにモデルを読み込むことができます。

def my_inference_function(data_connector, context):
    # Load a pickle object from the current partition's stage path
    model = context.download_from_stage("model.pkl")

    # Load from a different stage path (e.g., from a prior training run)
    model = context.download_from_stage(
        "model.pkl",
        stage_path="@db.schema.stage/training_run/partition_0"
    )

    # Load JSON with a custom deserializer
    import json
    config = context.download_from_stage(
        "config.json",
        read_function=lambda path: json.load(open(path, 'r'))
    )
Copy

Snowflakeセッションを使用する

with_session() を使用して、テーブルへの結果の書き込みなど、Snowflakeセッションを必要とする操作を実行します。このメソッドは、制限されたセッションプールを使用して、多くのパーティションを同時に実行するときにSnowflakeのセッション制限に達しないようにします。

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Load a model from a prior training run
    model = context.download_from_stage("model.pkl")

    predictions = model.predict(df[['X1', 'X2']])

    results = df.copy()
    results['predictions'] = predictions
    results['partition_id'] = context.partition_id

    # Write results to a Snowflake table using the bounded session pool
    context.with_session(lambda session:
        session.create_dataframe(results)
            .write.mode("append")
            .save_as_table("predictions_table")
    )
Copy

注釈

with_session() に渡される関数はcloudpickleを使用してシリアル化されます。クロージャで大きなオブジェクトやシリアル化できないリソースのキャプチャを回避します。

複数ノードにまたがるスケール

複数のノードにまたがって DPF を実行するには、実行を開始する前にクラスターをスケーリングします。

from snowflake.ml.runtime_cluster import scale_cluster

# Scale to 3 nodes for increased parallelism
scale_cluster(3)

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="multi_node_run",
    execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Copy

複数のノードで実行する際、ユーザー関数を実行せずにヘッドノードをコーディネーターとしてのみ機能させたい場合は use_head_node=False を設定します。ワーカーノードのメモリ不足エラーはコーディネーターには影響しないため、実行時間の長いワークロードの信頼性を改善できます。

制限と制約

  • ** 1回の同時実行 ** :一度に1つの DPF 実行のみを実行できます。別の実行中に新しい実行を開始すると、エラーが発生します。新しい実行を始める前に run.cancel() で以前の実行をキャンセルします。

  • **DataFrame 要件 ** :DataFrame モードのSnowpark DataFrame にはクエリが1つだけ含まれている必要があり、ポストアクションは含まれていない必要があります。

  • ** シングルノードの制限 ** : use_head_node=False はシングルノードクラスターではサポートされていません。

  • ** アーティファクトパス構造 ** :アーティファクトは @{stage_name}/{run_id}/{partition_id}/ に保管されます。このパス構造は固定であり、カスタマイズできません。

  • ** 復元された実行は読み取り専用です ** :DPFRun.restore_from() で復元された実行は wait() または cancel() を呼び出すことができません。

次のステップ