マルチノードクラスタ上で動作する ML 向けのContainer Runtime¶
このプレビューでは、 ML 向けのContainer Runtime を使用すると、Snowflake Notebooksのマルチノードクラスタ上で ML ワークロードを実行できます。 snowflake-ml-python
ライブラリには、 ML ワークロードに利用可能なコンピューティングプールのノード数をセットするための APIs が含まれており、コンピューティングプールのサイズを変更することなく、ワークロードに利用可能なリソースを拡張することができます。別の API がアクティブノードのリストを取得します。
マルチノードクラスタでは、1つのノードを ヘッド ノードに割り当てます。追加ノードは ワーカー ノードと呼ばれます。ヘッドノードはクラスタ内の並列操作をオーケストレーションし、ワークロードの実行にコンピューティングリソースを提供します。アクティブノードが1つのマルチノードクラスタにはヘッドノードしかありません。3つのアクティブノードを持つマルチノードクラスタには、1つのヘッドノードと2つのワーカーノードがあり、3つのノードすべてがワークロードの実行に参加します。
前提条件¶
ML ワークロードを実行するためにマルチノードクラスタを使用するには、以下のものが必要です。
NotebooksにアクセスできるアクティブなSnowflakeアカウント。 Snowflake Notebooks をご覧ください。
コンテナーランタイムを使用するNotebooksを作成および管理する権限。 ML 向けのContainer RuntimeのNotebooks を参照してください。
コンピューティングプールの構成¶
マルチノード設定を使用するには、少なくとも2つのノードを持つコンピューティングプールが必要です。 新しいコンピューティングプールを作成 するか、 既存のコンピューティングプールを変更 します。どちらのコマンドでも、 MAX_NODES 引数を渡して、プールの最大容量をセットします。ワークロードの大小に応じて簡単にスケールアップまたはスケールダウンできるように、1つまたは複数の追加ノードをプロビジョニングすることは良い習慣です。
コンピューティングプールの容量を確認するには、 DESCRIBE COMPUTE POOL コマンドを使用します。容量は、返されたテーブルの MAX_NODES 列にあります。
DESCRIBE COMPUTE POOL my_pool;
コンピューティングプールの容量をセットするには、 ALTER COMPUTE POOL コマンドを使用します。
ALTER COMPUTE POOL <compute_pool_name>
SET MAX_NODES = <total_capacity>;
マルチノードクラスタでのワークロードの実行¶
Notebooks用にマルチノードコンピューティングプールを選択することは、 ML ワークロードを実行するためにコンピューティングプール内の複数のノードを使用するために必要な唯一のアクションです。
Notebooksで、 snowflake.ml.runtime_cluster.scale_cluster
Python API を使用してアクティブノード数をセットします。コンピューティングプールのアクティブノード数は、プールの MAX_NODES までの、ワークロードを実行するために使用できるノードの数です。このメソッドは、ヘッドノードとすべてのワーカーノードを含む、必要なアクティブノードの総数を主要パラメーターとします。
注釈
この関数はデフォルトでブロッキングされ(つまり、スケーリング操作が終了するまで待機します)、12分間のタイムアウトがあります。操作がタイムアウトした場合は、自動的に以前の状態にロールバックされます。
スケーリング操作はセッションをまたいで持続しません。つまり、Notebooksのワーカーノード数がゼロでない状態で終了した場合、次にNotebooksを起動しても自動的にスケールアップすることはありません。ワーカーノードの数をセットするには、スケーリング API を再度呼び出す必要があります。
構文¶
snowflake.ml.runtime_cluster.scale_cluster(
expected_cluster_size: int,
*,
notebook_name: Optional[str] = None,
is_async: bool = False,
options: Optional[Dict[str, Any]] = None
) -> bool
引数¶
expected_cluster_size
(int): コンピューティングプールのアクティブノード数(プールの MAX_NODES まで)。これにはヘッドノードとすべてのワーカーノードが含まれます。notebook_name
(オプション [str]): ワークロードを実行するNotebooksの名前。スケールされるコンピューティングプールは、指定されたNotebooksが実行されているプールです。提供されない場合は、現在のコンテキストから自動的に決定されます。間違ったNotebooks名を使用した場合は例外が発生します。is_async
(bool): 関数がスケーリング待ちをブロックするかどうかを制御します。Falseの場合(デフォルト): 関数はクラスタの準備が完全に整うか、操作がタイムアウトするまでブロックします。
Trueの場合: この関数は、スケーリングリクエストが受け入れられたことを確認した直後に戻ります。
options
(オプション [Dict [str, Any]]): 高度な構成オプション:rollback_after_seconds
(int): スケーリングが完了しなかった場合に自動ロールバックするまでの最大時間。デフォルトは720秒です。block_until_min_cluster_size
(int): 関数が戻る前に準備できていなければならないノードの最小数。
戻り値¶
True
コンピューティングプールが指定されたアクティブノード数まで正常にスケールされた場合。そうでない場合は例外が発生します。
例¶
from snowflake.ml.runtime_cluster import scale_cluster
# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)
# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers
# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)
# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
利用可能なノード数の取得¶
クラスタ内のアクティブノードに関する情報を取得するには、 get_nodes
API を使用します。この関数は引数を取りません。
構文¶
get_nodes() -> list
戻り値¶
クラスタ内のアクティブノードの詳細を含むリスト。リストの各要素は、以下のキーを持つ辞書です。
name
(str): ノードの名前。cpus
(int): ノードの CPUs の数。gpus
(int): ノードの GPUs の数。
例¶
from snowflake.ml.runtime_cluster import get_nodes
# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
サンプルコードの出力は以下の通りです。
2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]
マルチノードクラスタでの分散トレーニング¶
ML 向けのContainer Runtimeは、 LightGBM、 XGBoost、 PyTorch モデルの分散トレーニングをサポートします。LightGBMEstimator、 XGBEstimator、 PyTorch の分散トレーニング APIs の詳細は API リファレンス にドキュメントがあります。
スケーリング構成¶
すべてのモデルは、トレーニングジョブのリソースを指定できるオプションのスケーリング構成パラメーターを提供します。スケーリング構成は、モデル固有のクラスのインスタンスです: モデルのタイプに応じて LightGBMScalingConfig
、 XGBScalingConfig
、または PyTorchScalingConfig
。
LightGBM および XGBoost スケーリング構成オブジェクトには、以下の属性があります。
num_workers
: トレーニングに使用するワーカープロセスの数。デフォルトは-1で、ワーカープロセスの数を自動的にセットします。num_cpu_per_worker
: ワーカープロセスごとに割り当てられた CPUs の数。デフォルトは-1で、ワーカープロセスあたりの CPUs の数を自動的にセットします。use_gpu
: GPU をトレーニングに使用するかどうか。デフォルトはNoneで、推定者が環境に応じて選択することができます。GPU を使用する場合は、必ずモデルのパラメーターも GPU を使用するように構成してください。
注釈
通常、 num_workers
と num_cpu_per_worker
はデフォルト値のままにしておきます。 ML 向けのContainer Servicesが、これらのリソースを配布する最適な方法を決定します。ランタイムは、コンピューティングプール内の各ノードにワーカーを割り当て、各ワーカーがタスクを完了するために必要な CPUs または GPUs を割り当てます。
PyTorch スケーリング構成オブジェクトは以下の属性を持ちます。
num_cpus
: 各ワーカーに予約する CPU コアの数。num_gpus
: 各ワーカーに予約する GPUs の数。デフォルトは0で、 GPUs が予約されていないことを示します。
LightGBM/XGBoost モデルの分散トレーニング¶
- メモリ使用状況
通常、 n GB の RAM を持つノードは、 n/4 から n/3 のデータに対して、メモリ不足になることなくモデルをトレーニングすることができます。最大データセットサイズは、ワーカープロセス数と使用するトレーニングアルゴリズムに依存します。
- パフォーマンスの計算
マルチノードトレーニングのパフォーマンスは、ツリーの深さ、ツリーの数、最大ビン数などのモデルパラメーターに依存します。これらのパラメーターの値を大きくすると、データセットの総トレーニング時間が長くなります。
例¶
次の例は、マルチノードクラスタ上で XGBoost モデルをトレーニングする方法を示しています。LightGBM モデルのトレーニングも同様です。
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"
# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
sql_script += f"select \n"
for i in range(1, 10):
sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
sql_script += f"FT_1 + FT_2 AS TARGET, \n"
sql_script += f"from TABLE(generator(rowcount=>({10000})));"
return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""
sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)
params = {
"eta": 0.1,
"max_depth": 8,
"min_child_weight": 100,
"tree_method": "hist",
}
scaling_config = XGBScalingConfig(
use_gpu=False
)
estimator = XGBEstimator(
n_estimators=50,
objective="reg:squarederror",
params=params,
scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
sample_train_df, ingestor_class=RayDataIngester
)
xgb_model = estimator.fit(
data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
PyTorch モデルの分散トレーニング¶
PyTorch モデルは、各ワーカープロセスで呼び出されるトレーニング関数(train_func
)を使ってトレーニングされます。
コンテキスト APIs の使用¶
トレーニング関数の実行中に、コンテキスト APIs を使用して、トレーニング環境に関する重要なメタデータにアクセスしたり、呼び出し元からトレーニング関数へのパラメーター転送を行ったりすることができます。PyTorch コンテキストクラスのドキュメントは 関連クラス をご覧ください。
コンテキストオブジェクトは、トレーニング関数の動作をカスタマイズするために使用できるランタイムメタデータを公開します。これらは、 get_node_rank
、 get_local_rank
、 get_world_size
などの提供されているメソッドを使って取得できます。
次のコードは、コンテキストオブジェクトから値 test
と train
を取得する例です。これらは dataset_map
というキーで渡されます(このトピックの後のトレーニング関数の例で見ることができます)。これらの値は、 PyTorch データセットオブジェクトを作成するために使用され、その後モデルに渡されます。
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
メトリックレポート¶
コンテキストオブジェクトの
metrics_reporter
メソッドを使用して、トレーニング関数から制御コードにメトリックを送信します。これにより、次の例に示すように、トレーニングプロセスのリアルタイムモニタリングとデバッグが可能になります。context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
例¶
次の例は、 PyTorch モデルのトレーニング関数です。
def train_func():
import io
import base64
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import transforms
from torch.utils.data import IterableDataset
from torch.optim.lr_scheduler import StepLR
from PIL import Image
from snowflake.ml.modeling.distributors.pytorch import get_context
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
class DecodedDataset(IterableDataset):
def __init__(self, source_dataset):
self.source_dataset = source_dataset
self.transforms = transforms.ToTensor() # Ensure we apply ToTensor transform
def __iter__(self):
for row in self.source_dataset:
base64_image = row['IMAGE']
image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
# Convert the image to a tensor
image = self.transforms(image) # Converts PIL image to tensor
labels = row['LABEL']
yield image, int(labels)
def train(model, device, train_loader, optimizer, epoch):
model.train()
batch_idx = 1
for data, target in train_loader:
# print(f"data : {data} \n target: {target}")
# raise RuntimeError("test")
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
batch_idx += 1
context = get_context()
rank = context.get_local_rank()
device = f"cuda:{rank}"
is_distributed = context.get_world_size() > 1
if is_distributed:
dist.init_process_group(backend="nccl")
print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
batch_size = 64
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
model = Net().to(device)
if is_distributed:
model = DDP(model)
optimizer = optim.Adadelta(model.parameters())
scheduler = StepLR(optimizer, step_size=1)
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
start_time = time.time()
for epoch in range(num_epochs):
train(model, device, train_loader, optimizer, epoch+1)
scheduler.step()
now = time.time()
context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
test(model, device, test_loader, context)
次のコードは、前述のトレーニング関数が与えられた場合に分散トレーニングを開始する方法を示しています。この例では、複数のノードでトレーニングを実行するために、 PyTorch ディストリビュータオブジェクトを作成し、トレーニングデータとテストデータをコンテキストオブジェクトを介してトレーニング関数に接続し、トレーナーを実行する前にスケーリング構成を確立します。
# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector
df = session.table("MNIST_60K")
train_df, test_df = df.random_split([0.99, 0.01], 0)
# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
scaling_config=PyTorchScalingConfig( # scaling configuration
num_nodes=2,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
)
)
# Run the trainer.
results = pytorch_trainer.run( # accepts context values as parameters
dataset_map={"train": train_data, "test": test_data},
hyper_params={"num_epochs": "1"}
)
既知の制限と一般的な問題¶
これらの制限や問題は、 ML 向けのContainer Runtimeのマルチノードトレーニングが一般的に利用可能になる前に解決される可能性が高いです。
スケーリング操作のタイムアウト¶
新しいノードが12分のタイムアウト内に準備できないため、スケーリング操作が失敗することがあります。考えられる原因は以下の通りです。
プール容量の不足。 プールの MAX_NODES を超えるノードをリクエストしています。プールの MAX_NODES を増やしてください。
リソースの競合。 12分では追加されたノードを暖めるのに十分な時間ではないかもしれません。プールの MIN_NODES をより大きな数値にセットして一部のノードをウォームアップしておくか、
scale_cluster
を複数回呼び出すことでアクティブノードの数を増やし、インクリメントを小さくします。もう一つの選択肢は、非同期モードを使用して、すべてのノードの準備が整うのを待つのをスキップすることです。ノンブロッキング操作には非同期モードを使用します。
scale_cluster(3, is_async=True)
タイムアウトのしきい値を上げてください。
scale_cluster(3, options={"rollback_after_seconds": 1200})
Notebooks名エラー¶
「Notebooks<名前>が存在しないか、承認されていません」のようなエラーメッセージが表示された場合は、自動検出されたNotebooks名が現在のNotebooksと一致しないことを意味します。これは次のような場合に起こります。
Notebooks名に、ドットやスペースなどの特殊文字が含まれている
Notebooks名の自動検出が正しく動作していない
解決策: Notebooks名のパラメーターを明示的に指定します。Notebooks名を 識別子 として扱うには二重引用符が必要であることに注意してください。
# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
scale_cluster(2)
except Exception as e:
print(e) # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
失敗したスケーリング操作の後、SPCS サービスがクリーンアップされません¶
スケーリング操作が失敗した場合、システムはその操作で作成されたすべてのリソースをクリーンアップする必要があります。しかし、これに失敗すると、1つ以上の SPCS サービスが PENDING または FAILED の状態のままになる可能性があります。PENDING 状態にあるサービスは後で ACTIVE になる可能性があり、コンピューティングプールに容量がない場合は永久に PENDING 状態のままになります。
PENDING または FAILED の状態のサービスを削除するには、クラスタを1ノード(ワーカーノードはゼロ)にスケールします。起動したすべてのサービスをクリーンアップするには、Notebooksインターフェイスの「セッションを終了」をクリックして、現在のNotebooksセッションを終了します。