Snowflakeマルチノード ML ジョブ¶
Snowflakeマルチノード ML ジョブを使用して、Snowflake ML 内の分散機械学習(ML)ワークフローコンテナランタイムを、複数のコンピュートノードにわたって実行します。複数のノードに作業を分散して、大規模なデータセットや複雑なモデルを処理し、パフォーマンスを向上させます。Snowflake ML ジョブについては、Snowflake MLのジョブ をご参照ください。
Snowflakeマルチノード ML ジョブは、複数のノードでの分散実行を可能にすることでSnowflake ML ジョブ機能を拡張します。これにより、次が導入されます。
スケーラブルなパフォーマンス:単一ノードに収まらないほど大きいデータセットを処理するために水平にスケールします
トレーニング時間を短縮:並列化により複雑なモデルトレーニングを高速化します
リソース効率:データ集約型ワークロードのリソース使用率を最適化します
フレームワーク統合:分散モデリングクラス および Ray のような分散フレームワークをシームレスに使用します。
Snowflake ML ジョブを複数のノードで実行すると、次が発生します。
1つのノードがヘッドノード(コーディネーター)として機能
追加のノードはワーカーノード(コンピュートリソース)として機能
ノードを合わせて、Snowflakeに単一の論理的 ML ジョブエンティティを形成します
シングルノード ML ジョブ にはヘッドノードのみがあります。3つのアクティブノードを持つマルチノードジョブには、1つのヘッドノードと2つのワーカーノードがあります。3つのノードすべてがワークロードの実行に参加します。
前提条件¶
Snowflakeマルチノード ML ジョブを使用するには、以下の前提条件が必要です。
重要
Snowflakeマルチノード ML ジョブは現在、Python 3.10クライアントのみをサポートしています。他のPythonバージョンのサポートが必要な場合は、Snowflakeアカウントチームにお問い合わせください。
マルチノードジョブを設定するには、次を実行します。
Python 3.10環境にSnowflake ML Pythonパッケージをインストールします。
pip install snowflake-ml-python>=1.9.2
マルチノードジョブをサポートするのに十分なノードを持つコンピューティングプールを作成します。
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = 1 MAX_NODES = <NUM_INSTANCES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
重要
MAX_NODES は、トレーニングジョブの実行に使用しているターゲットインスタンスの数以上で設定する必要があります。トレーニングジョブに使用する予定よりも多くのノードをリクエストすると、失敗したり、予期しない動作をする可能性があります。トレーニングジョブの実行については、マルチノード ML ジョブの実行 をご参照ください。
マルチノードジョブのコード記述¶
マルチノードジョブの場合、分散モデリングクラス または Ray を使用して分散処理用にコードを設計する必要があります。
以下は、分散モデリングクラスまたはRayを使用する際の重要なパターンと考慮事項です。
ノードの初期化と可用性について理解する¶
マルチノードジョブでは、ワーカーノードは非同期で異なる時間に初期化できます。
特にコンピューティングプールのリソースが制限されている場合は、ノードすべてが同時に起動しない可能性があります
一部のノードは、他のノードから数秒または数分後に開始する場合があります
ML ジョブは指定された
target_instances
を自動的に待機し、ペイロードの実行前に利用できるようにします。タイムアウト期間内に期待されるノードが利用できない場合、ジョブはエラーで失敗します。この動作のカスタマイズに関する詳細は、高度な構成:min_instancesの使用 をご参照ください。
Rayで、ジョブ内の利用可能なノードを確認できます。
import ray
ray.init(address="auto", ignore_reinit_error=True) # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
分散処理パターン¶
分散処理のマルチノードジョブのペイロードボディには、複数のパターンを適用できます。これらのパターンは 分散モデリングクラス および Ray を活用します。
Snowflakeの分散トレーニング API の使用¶
Snowflakeは、一般的な ML フレームワークに最適化されたトレーニングを提供します。
# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
利用可能な APIs の詳細については、分散モデリングクラス をご参照ください。
ネイティブRayタスクの使用¶
別のアプローチは、Rayのタスクベースのプログラミングモデルを使用することです。
# Inside the ML Job payload body
import ray
@ray.remote
def process_chunk(data_chunk):
# Process a chunk of data
return processed_result
# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
詳細については、Rayのタスクプログラミングドキュメント をご参照ください。
マルチノード ML ジョブの実行¶
target_instances
パラメーターを使用し、単一ノードのジョブと同じメソッドで、マルチノード ML ジョブを実行できます。
リモートデコレーターの使用¶
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=3 # Specify the number of nodes
)
def distributed_training(data_table: str):
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = distributed_training("<my_training_data>")
Pythonファイルの実行¶
from snowflake.ml.jobs import submit_file
job = submit_file(
"<script_path>",
"MY_COMPUTE_POOL",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
ディレクトリの実行¶
from snowflake.ml.jobs import submit_directory
job = submit_directory(
"<script_directory>",
"MY_COMPUTE_POOL",
entrypoint="<script_name>",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
高度な構成:min_instancesの使用¶
より柔軟なリソース管理のために、オプションの min_instances
パラメーターを使用して、ジョブの進行に必要なインスタンスの最小数を指定できます。min_instances
が設定されている場合は、最小数のノードが利用可能になるとすぐに、その数が target_instances
より小さい場合であっても、ジョブペイロードが実行されます。
これは、次の場合に役立ちます。
完全なターゲットがすぐに利用できない場合は、より少ないノードでトレーニングを開始
コンピューティングプールリソースが限られている場合の待機時間を短縮
変化するリソースの可用性に適応できるフォールトトレラントなワークフローを実装
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5, # Prefer 5 nodes
min_instances=3 # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
import ray
# Check how many nodes we actually got
available_nodes = len(ray.nodes())
print(f"Training with {available_nodes} nodes")
# Adapt your training logic based on available resources
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
scaling_config = XGBScalingConfig(
num_workers=available_nodes
)
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using available distributed resources
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = flexible_distributed_training("<my_training_data>")
マルチノードジョブの管理¶
ジョブステータスの監視¶
ジョブステータスの監視は、単一ノードのジョブから変更されません。
from snowflake.ml.jobs import MLJob, get_job, list_jobs
# List all jobs
jobs = list_jobs()
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}") # PENDING, RUNNING, FAILED, DONE
# Wait for completion
job.wait()
ノードによるログへのアクセス¶
マルチノードジョブでは、特定のインスタンスからログにアクセスできます。
# Get logs from the default (head) instance
logs_default = job.get_logs()
# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)
# Display logs in the notebook/console
job.show_logs() # Default (head) instance logs
job.show_logs(instance_id=0) # Instance 0 logs (not necessarily the head node)
既知の問題と制限¶
次の情報を使用して、発生する可能性のある一般的な問題に対処します。
ノード接続の失敗:ワーカーノードがヘッドノードへの接続に失敗すると、ヘッドノードがタスクを完了し、ワーカーがジョブを終了する前に自分自身を拒否する可能性があります。接続の失敗を避けるために、ジョブに結果コレクションロジックを実装します。
メモリ不足:メモリの問題が原因でジョブが失敗する場合は、ノードのサイズを増やすか、ノードあたりのデータが少ないノードを多く使用します。
ノードの可用性タイムアウト:必要なインスタンス数(
target_instances
またはmin_instances
のいずれか)が事前定義されたタイムアウト内に利用できない場合、ジョブは失敗します。コンピューティングプールに十分な容量があることを確認するか、インスタンス要件を調整します。