パーティション全体でカスタムロジックによるデータの処理¶
分散パーティション関数( DPF )を使用して、コンピューティングプール内の1つ以上のノードで並行してデータを処理します。 DPF は、分散オーケストレーション、エラー、可観測性、アーティファクトの永続性を自動的に処理します。Snowflakeノートブック または Snowflake ML ジョブ のいずれかでDPF を実行できます。
DPF は以下の実行モードをサポートしています。
**DataFrame モード ** (
run()):列の値によってSnowpark DataFrame をパーティション化し、各パーティションで同時に関数を実行します。データは、最適なスループットのために並列にプリフェッチされます。** ステージモード ** (
run_from_stage()):各ファイルがパーティションになるSnowflakeステージからファイルを処理します。予測可能なメモリ使用量の大規模なファイル処理に最適です。
DPF を使用すると、異なるデータセグメント間で大規模なデータセットを効率的に処理できます。
このツールは、次のようなシナリオに最適です。
地域別の販売データの分析
地理的セグメントによる顧客データの処理
各データパーティションの ML モデルのトレーニング
各データパーティションが同じ処理ロジックを必要とするデータ変換の実行
DPF は分散データ処理を自動的に処理します。分散コンピューティングインフラストラクチャを管理する必要はありません。
DPF を使用すると、 GPU アクセスを備えたコンテナ化されたインフラストラクチャ上でオープンソースライブラリを使用して、カスタムPythonコードを記述できます。
重要
DPF は、結果とアーティファクトをSnowflakeステージに自動的に保管します。DPF を使用する前に、 DPF が結果とアーティファクトを格納するステージに対する権限があることを確認します。
DataFrame モード:列パーティションによるデータの処理¶
DataFrame モードを使用して、指定された列で Snowpark DataFrame をパーティション化し、各パーティションでPython関数を並行して実行します。次の例は、地域ごとの販売データの処理を示しています。
:ref:` 処理関数を定義する <label-dpf-df-define-function>`
:ref:` 進行状況を監視して完了を待つ <label-dpf-df-monitor>`
:ref:` 各パーティションから結果を取得する <label-dpf-df-retrieve>`
:ref:` エラーを処理する<label-dpf-df-errors>`
:ref:` 完了した実行の結果を復元する <label-dpf-df-restore>`
処理関数を定義する¶
分散処理の実行に必要なクラスをインポートします。
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, ExecutionOptions
)
2つの引数を取る処理関数を定義します。
data_connector:パーティションのデータへのアクセスを提供する DataConnector 。data_connector.to_pandas()を呼び出し、それをpandas DataFrame としてロードするか、to_torch_dataset()またはto_ray_dataset()のような他のメソッドを使用します。context:アーティファクトをアップロードおよびダウンロードするためのパーティション ID および メソッドを提供する PartitionContext オブジェクト。
import json
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
各リージョンについて、この関数は要約統計をコンピューティングし、結果を JSON ファイルとしてパーティション専用のステージディレクトリに保存します。
DPF を初期化して実行する¶
処理関数と出力ステージ名を含む DPF インスタンスを作成してから、 run() を呼び出して分散処理を開始します。
重要
提供するSnowpark DataFrame は、テーブルから作成する必要があります。テーブルから DataFrame を作成することについては、 DataFrame の構築 を参照してください。
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
run() メソッドは次のパラメーターを受け入れます。
partition_by(str):DataFrame をパーティション化する列名。一意の値ごとに個別のパーティションが作成されます。snowpark_dataframe:パーティション化して処理するSnowpark DataFrame 。run_id(str):これを実行するための一意の識別子。すべてのアーティファクト専用ディレクトリ@{stage_name}/{run_id}/を作成します。experiment_2024_01_15またはmodel_v1_retrainのような説明的な名前を使用します。on_existing_artifacts(str、オプション):run_idのアーティファクトが発生した場合のアクションはすでに存在します。"error"(デフォルト)はエラーを発生します。"overwrite"は、既存のアーティファクトを置き換えます。execution_options( ExecutionOptions 、オプション):リソースの割り当てと実行動作の構成。
進行状況を監視して完了を待つ¶
wait() を呼び出して実行が完了するまでブロックします。デフォルトでは、進行状況バーが表示されます。
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
以下は出力の例です。
Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS
また、ブロックせずにいつでもステータスと進捗状況を確認することもできます。
# Check overall status
current_status = run.status
# Get progress grouped by partition status
progress = run.get_progress()
各パーティションから結果を取得する¶
実行が正常に完了したら、 partition_details プロパティを使用して各パーティションから結果を取得します。各パーティションの詳細には、保存されたアーティファクトにアクセスするための stage_artifacts_manager が含まれます。
if final_status == RunStatus.SUCCESS:
import json
all_results = []
for partition_id, details in run.partition_details.items():
# List available artifacts for this partition
files = details.stage_artifacts_manager.list()
print(f"Partition {partition_id} artifacts: {files}")
# Load an artifact using a custom deserializer
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
stage_artifacts_manager には3つのメソッドがあります。
list():パーティションのステージディレクトリに保存されているファイル名のリストを返します。get(filename, read_function=None):アーティファクトをダウンロードし、逆シリアル化します。デフォルトでpickleを使用するか、提供されている場合はカスタムread_functionを使用します。download(filename, local_dir):生ファイルをローカルディレクトリにダウンロードし、ローカルファイルパスを返します。
エラーを処理する¶
実行が成功しない場合は、個々のパーティションの詳細を検査して障害を診断します。
if final_status != RunStatus.SUCCESS:
for partition_id, details in run.partition_details.items():
if details.status != PartitionStatus.DONE:
print(f"Partition {partition_id} status: {details.status}")
try:
error_logs = details.logs
print(error_logs)
except RuntimeError:
print("Logs not available for this partition")
RunStatus 全体で集計結果を反映します。
SUCCESS:すべてのパーティションが正常に完了しました。PARTIAL_FAILURE:一部のパーティションは正常に完了しましたが、少なくとも1つが失敗しました。FAILURE:正常に完了したパーティションがありません。CANCELLED:実行はキャンセルされました。IN_PROGRESS:まだ実行中です。
各パーティションには PartitionStatus があります。
PENDING:処理待ち。RUNNING:現在処理中。DONE:正常に完了しましたFAILED:ユーザー関数が例外を発生させました。CANCELLED:ユーザーによってキャンセルされました。INTERNAL_ERROR:内部システムエラーが発生しました(例:メモリ不足)。
これらの列挙型をインポートするには:
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, PartitionStatus
)
実行中のジョブをキャンセルするには:
run.cancel()
注釈
すでに完了しているパーティションはキャンセルの影響を受けません。完了したパーティションの部分的な結果、ログ、およびアーティファクトは引き続き利用可能です。
完了した実行の結果を復元する¶
永続化された状態から完了した実行を復元し、プロセスを再実行せずに同じ結果にアクセスできます。
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")
# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
print(f"{partition_id}: {details.status}")
注釈
復元された実行は読み取り専用です。復元された実行で wait() または cancel() を呼び出すことはできません。
ステージモード:ステージからのファイルの処理¶
ステージモードを使用して、各ファイルがパーティションになるSnowflakeステージからファイルを処理します。これは、ステージングされたParquetファイルのコレクションの処理など、大規模なファイル処理に最適です。
処理関数を定義する¶
処理関数の署名は、 DataFrame モードと同じです。data_connector はファイルのデータへのアクセスを提供し、 context.partition_id はステージ内の相対ファイルパスです。
def process_file(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing file {context.partition_id}: {len(df)} rows")
# Process data and save results
result = {"row_count": len(df), "columns": list(df.columns)}
import json
context.upload_to_stage(result, "result.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
ステージから DPF を実行する¶
run_from_stage() を run() の代わりに呼び出します。ソースファイルおよびオプションで file_pattern を含む入力 stage_location を指定し、処理するファイルをフィルターします。
dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
stage_location="@my_db.my_schema.input_stage/data/",
run_id="file_processing_2024",
file_pattern="*.parquet",
)
final_status = run.wait()
run_from_stage() メソッドは次のパラメーターを受け入れます。
stage_location(str):ソースデータファイルを含む入力ステージパス。file_patternに一致する各ファイルはパーティションになります。単純および完全修飾ステージ名の両方をサポートします。単純:
"@my_stage/data/"完全修飾:
"@my_db.my_schema.my_stage/data/"
run_id(str):これを実行するための一意の識別子。file_pattern(str、オプション):ファイルをフィルターするGlobパターン。デフォルトは"*.parquet"です。on_existing_artifacts(str、オプション):"error"(デフォルト)または"overwrite"。execution_options( ExecutionOptions 、オプション):リソースの割り当てと実行動作の構成。
注釈
stage_location は * 入力 * データソースです。DPF() に提供される stage_name はログや結果などのアーティファクトの * 出力 * の場所です。これらは異なるステージにすることができます。
モニタリング、結果取得、エラー処理、および実行の復元は DataFrame モード と同じように機能します。
I/Oバウンドのファイル処理の場合、 ExecutionOptions の num_cpus_per_worker=1 を設定して並列度を最大化します( CPU ごとに1つのアクター)。CPUバウンドのワークロードの場合、デフォルトを使用するか num_cpus_per_worker を増やします。
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
run = dpf.run_from_stage(
stage_location="@my_stage/data/",
run_id="io_bound_processing",
execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
実行オプションを構成する¶
ExecutionOptions を使用して、ワーカーごとの CPU/GPU 割り当て、再試行回数、フェイルファストの動作などリソースの割り当てや実行動作を制御します。すべてのフィールドは適切なデフォルトを持つオプションです。
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
options = ExecutionOptions(
num_cpus_per_worker=4,
num_gpus_per_worker=1,
fail_fast=True,
)
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="my_run",
execution_options=options,
)
パラメーターの完全なリストとワーカーのスケーリング動作については、 ExecutionOptions API 参照 をご覧ください。
PartitionContext を使用したアーティファクトの操作¶
PartitionContext オブジェクトは、処理関数の2番目の引数として渡されます。パーティションの実行中にアーティファクトやSnowflakeセッションと対話するためのメソッドを提供します。
アーティファクトをアップロードする¶
upload_to_stage() を使用して処理関数内から結果を保存します。デフォルトでは、オブジェクトはピクルを使用してシリアル化されます。カスタムシリアル化のための write_function を提供します。
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Save a pickle object (default serialization)
results = {'total': df['amount'].sum(), 'count': len(df)}
context.upload_to_stage(results, "summary.pkl")
# Save JSON data with a custom write function
import json
context.upload_to_stage(
results, "summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
)
# Save a CSV file
df_processed = df.groupby('category').sum()
context.upload_to_stage(
df_processed, "aggregated.csv",
write_function=lambda df, path: df.to_csv(path, index=False)
)
アーティファクトをダウンロードする¶
download_from_stage() を使用して処理関数内のアーティファクトをロードします。この関数を使って、以前の実行のアーティファクトにアクセスすることができます。たとえば、推論のためにモデルを読み込むことができます。
def my_inference_function(data_connector, context):
# Load a pickle object from the current partition's stage path
model = context.download_from_stage("model.pkl")
# Load from a different stage path (e.g., from a prior training run)
model = context.download_from_stage(
"model.pkl",
stage_path="@db.schema.stage/training_run/partition_0"
)
# Load JSON with a custom deserializer
import json
config = context.download_from_stage(
"config.json",
read_function=lambda path: json.load(open(path, 'r'))
)
Snowflakeセッションを使用する¶
with_session() を使用して、テーブルへの結果の書き込みなど、Snowflakeセッションを必要とする操作を実行します。このメソッドは、制限されたセッションプールを使用して、多くのパーティションを同時に実行するときにSnowflakeのセッション制限に達しないようにします。
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Load a model from a prior training run
model = context.download_from_stage("model.pkl")
predictions = model.predict(df[['X1', 'X2']])
results = df.copy()
results['predictions'] = predictions
results['partition_id'] = context.partition_id
# Write results to a Snowflake table using the bounded session pool
context.with_session(lambda session:
session.create_dataframe(results)
.write.mode("append")
.save_as_table("predictions_table")
)
注釈
with_session() に渡される関数はcloudpickleを使用してシリアル化されます。クロージャで大きなオブジェクトやシリアル化できないリソースのキャプチャを回避します。
複数ノードにまたがるスケール¶
複数のノードにまたがって DPF を実行するには、実行を開始する前にクラスターをスケーリングします。
from snowflake.ml.runtime_cluster import scale_cluster
# Scale to 3 nodes for increased parallelism
scale_cluster(3)
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="multi_node_run",
execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
複数のノードで実行する際、ユーザー関数を実行せずにヘッドノードをコーディネーターとしてのみ機能させたい場合は use_head_node=False を設定します。ワーカーノードのメモリ不足エラーはコーディネーターには影響しないため、実行時間の長いワークロードの信頼性を改善できます。
制限と制約¶
** 1回の同時実行 ** :一度に1つの DPF 実行のみを実行できます。別の実行中に新しい実行を開始すると、エラーが発生します。新しい実行を始める前に
run.cancel()で以前の実行をキャンセルします。**DataFrame 要件 ** :DataFrame モードのSnowpark DataFrame にはクエリが1つだけ含まれている必要があり、ポストアクションは含まれていない必要があります。
** シングルノードの制限 ** :
use_head_node=Falseはシングルノードクラスターではサポートされていません。** アーティファクトパス構造 ** :アーティファクトは
@{stage_name}/{run_id}/{partition_id}/に保管されます。このパス構造は固定であり、カスタマイズできません。** 復元された実行は読み取り専用です ** :
DPFRun.restore_from()で復元された実行はwait()またはcancel()を呼び出すことができません。
次のステップ¶
データパーティション全体でモデルをトレーニングする を確認し基盤インフラストラクチャとして DPF を使用して複数の ML モデルをトレーニングする方法を学びます。
API ドキュメントの全文は、 ` 分散パーティション関数( DPF ) API 参照 <https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/container-runtime/distributors.distributed_partition_function>`_ をご覧ください。
エンドツーエンドの例については、 Snowflake ML サンプルノートブック を参照してください。