Snowflakeマルチノード ML ジョブ

Snowflakeマルチノード ML ジョブを使用して、Snowflake ML 内の分散機械学習(ML)ワークフローコンテナランタイムを、複数のコンピュートノードにわたって実行します。複数のノードに作業を分散して、大規模なデータセットや複雑なモデルを処理し、パフォーマンスを向上させます。Snowflake ML ジョブについては、Snowflake MLのジョブ をご参照ください。

Snowflakeマルチノード ML ジョブは、複数のノードでの分散実行を可能にすることでSnowflake ML ジョブ機能を拡張します。これにより、次が導入されます。

  • スケーラブルなパフォーマンス:単一ノードに収まらないほど大きいデータセットを処理するために水平にスケールします

  • トレーニング時間を短縮:並列化により複雑なモデルトレーニングを高速化します

  • リソース効率:データ集約型ワークロードのリソース使用率を最適化します

  • フレームワーク統合:分散モデリングクラス および Ray のような分散フレームワークをシームレスに使用します。

Snowflake ML ジョブを複数のノードで実行すると、次が発生します。

  • 1つのノードがヘッドノード(コーディネーター)として機能

  • 追加のノードはワーカーノード(コンピュートリソース)として機能

  • ノードを合わせて、Snowflakeに単一の論理的 ML ジョブエンティティを形成します

シングルノード ML ジョブ にはヘッドノードのみがあります。3つのアクティブノードを持つマルチノードジョブには、1つのヘッドノードと2つのワーカーノードがあります。3つのノードすべてがワークロードの実行に参加します。

前提条件

Snowflakeマルチノード ML ジョブを使用するには、以下の前提条件が必要です。

重要

Snowflakeマルチノード ML ジョブは現在、Python 3.10クライアントのみをサポートしています。他のPythonバージョンのサポートが必要な場合は、Snowflakeアカウントチームにお問い合わせください。

マルチノードジョブを設定するには、次を実行します。

  1. Python 3.10環境にSnowflake ML Pythonパッケージをインストールします。

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. マルチノードジョブをサポートするのに十分なノードを持つコンピューティングプールを作成します。

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = 1
      MAX_NODES = <NUM_INSTANCES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy

    重要

    MAX_NODES は、トレーニングジョブの実行に使用しているターゲットインスタンスの数以上で設定する必要があります。トレーニングジョブに使用する予定よりも多くのノードをリクエストすると、失敗したり、予期しない動作をする可能性があります。トレーニングジョブの実行については、マルチノード ML ジョブの実行 をご参照ください。

マルチノードジョブのコード記述

マルチノードジョブの場合、分散モデリングクラス または Ray を使用して分散処理用にコードを設計する必要があります。

以下は、分散モデリングクラスまたはRayを使用する際の重要なパターンと考慮事項です。

ノードの初期化と可用性について理解する

マルチノードジョブでは、ワーカーノードは非同期で異なる時間に初期化できます。

  • 特にコンピューティングプールのリソースが制限されている場合は、ノードすべてが同時に起動しない可能性があります

  • 一部のノードは、他のノードから数秒または数分後に開始する場合があります

  • ML ジョブは指定された target_instances を自動的に待機し、ペイロードの実行前に利用できるようにします。タイムアウト期間内に期待されるノードが利用できない場合、ジョブはエラーで失敗します。この動作のカスタマイズに関する詳細は、高度な構成:min_instancesの使用 をご参照ください。

Rayで、ジョブ内の利用可能なノードを確認できます。

import ray
ray.init(address="auto", ignore_reinit_error=True)  # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Copy

分散処理パターン

分散処理のマルチノードジョブのペイロードボディには、複数のパターンを適用できます。これらのパターンは 分散モデリングクラス および Ray を活用します。

Snowflakeの分散トレーニング API の使用

Snowflakeは、一般的な ML フレームワークに最適化されたトレーニングを提供します。

# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()

# Create distributed estimator
estimator = XGBEstimator(
    n_estimators=100,
    params={"objective": "reg:squarederror"},
    scaling_config=scaling_config
)

# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Copy

利用可能な APIs の詳細については、分散モデリングクラス をご参照ください。

ネイティブRayタスクの使用

別のアプローチは、Rayのタスクベースのプログラミングモデルを使用することです。

# Inside the ML Job payload body
import ray

@ray.remote
def process_chunk(data_chunk):
    # Process a chunk of data
    return processed_result

# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Copy

詳細については、Rayのタスクプログラミングドキュメント をご参照ください。

マルチノード ML ジョブの実行

target_instances パラメーターを使用し、単一ノードのジョブと同じメソッドで、マルチノード ML ジョブを実行できます。

リモートデコレーターの使用

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=3  # Specify the number of nodes
)
def distributed_training(data_table: str):

    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    # Configure scaling for distributed execution
    scaling_config = XGBScalingConfig()

    # Create distributed estimator
    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using distributed resources
    # NOTE: data_connector and feature_cols excluded for brevity
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")


job = distributed_training("<my_training_data>")
Copy

Pythonファイルの実行

from snowflake.ml.jobs import submit_file

job = submit_file(
    "<script_path>",
    "MY_COMPUTE_POOL",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

ディレクトリの実行

from snowflake.ml.jobs import submit_directory

job = submit_directory(
    "<script_directory>",
    "MY_COMPUTE_POOL",
    entrypoint="<script_name>",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

高度な構成:min_instancesの使用

より柔軟なリソース管理のために、オプションの min_instances パラメーターを使用して、ジョブの進行に必要なインスタンスの最小数を指定できます。min_instances が設定されている場合は、最小数のノードが利用可能になるとすぐに、その数が target_instances より小さい場合であっても、ジョブペイロードが実行されます。

これは、次の場合に役立ちます。

  • 完全なターゲットがすぐに利用できない場合は、より少ないノードでトレーニングを開始

  • コンピューティングプールリソースが限られている場合の待機時間を短縮

  • 変化するリソースの可用性に適応できるフォールトトレラントなワークフローを実装

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=5,  # Prefer 5 nodes
    min_instances=3      # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
    import ray

    # Check how many nodes we actually got
    available_nodes = len(ray.nodes())
    print(f"Training with {available_nodes} nodes")

    # Adapt your training logic based on available resources
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    scaling_config = XGBScalingConfig(
        num_workers=available_nodes
    )

    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using available distributed resources
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")

job = flexible_distributed_training("<my_training_data>")
Copy

マルチノードジョブの管理

ジョブステータスの監視

ジョブステータスの監視は、単一ノードのジョブから変更されません。

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # PENDING, RUNNING, FAILED, DONE

# Wait for completion
job.wait()
Copy

ノードによるログへのアクセス

マルチノードジョブでは、特定のインスタンスからログにアクセスできます。

# Get logs from the default (head) instance
logs_default = job.get_logs()

# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)

# Display logs in the notebook/console
job.show_logs()  # Default (head) instance logs
job.show_logs(instance_id=0)  # Instance 0 logs (not necessarily the head node)
Copy

既知の問題と制限

次の情報を使用して、発生する可能性のある一般的な問題に対処します。

  • ノード接続の失敗:ワーカーノードがヘッドノードへの接続に失敗すると、ヘッドノードがタスクを完了し、ワーカーがジョブを終了する前に自分自身を拒否する可能性があります。接続の失敗を避けるために、ジョブに結果コレクションロジックを実装します。

  • メモリ不足:メモリの問題が原因でジョブが失敗する場合は、ノードのサイズを増やすか、ノードあたりのデータが少ないノードを多く使用します。

  • ノードの可用性タイムアウト:必要なインスタンス数(target_instances または min_instances のいずれか)が事前定義されたタイムアウト内に利用できない場合、ジョブは失敗します。コンピューティングプールに十分な容量があることを確認するか、インスタンス要件を調整します。