Container Runtime on multi-node clusters

In this preview, Container Runtime allows you to run ML workloads on multi-node clusters in Snowflake Notebooks. The snowflake-ml-python library includes APIs to set the number of nodes in the compute pool available for ML workloads, allowing the resources available to a workload to be scaled without resizing the compute pool. Another API retrieves a list of active nodes.

マルチノードクラスタでは、1つのノードを ヘッド ノードに割り当てます。追加ノードは ワーカー ノードと呼ばれます。ヘッドノードはクラスタ内の並列操作をオーケストレーションし、ワークロードの実行にコンピューティングリソースを提供します。アクティブノードが1つのマルチノードクラスタにはヘッドノードしかありません。3つのアクティブノードを持つマルチノードクラスタには、1つのヘッドノードと2つのワーカーノードがあり、3つのノードすべてがワークロードの実行に参加します。

前提条件

ML ワークロードを実行するためにマルチノードクラスタを使用するには、以下のものが必要です。

コンピューティングプールの構成

マルチノード設定を使用するには、少なくとも2つのノードを持つコンピューティングプールが必要です。 新しいコンピューティングプールを作成 するか、 既存のコンピューティングプールを変更 します。どちらのコマンドでも、 MAX_NODES 引数を渡して、プールの最大容量をセットします。ワークロードの大小に応じて簡単にスケールアップまたはスケールダウンできるように、1つまたは複数の追加ノードをプロビジョニングすることは良い習慣です。

コンピューティングプールの容量を確認するには、 DESCRIBE COMPUTE POOL コマンドを使用します。容量は、返されたテーブルの MAX_NODES 列にあります。

DESCRIBE COMPUTE POOL my_pool;
Copy

コンピューティングプールの容量をセットするには、 ALTER COMPUTE POOL コマンドを使用します。

ALTER COMPUTE POOL <compute_pool_name>
    SET MAX_NODES = <total_capacity>;
Copy

マルチノードクラスタでのワークロードの実行

Notebooks用にマルチノードコンピューティングプールを選択することは、 ML ワークロードを実行するためにコンピューティングプール内の複数のノードを使用するために必要な唯一のアクションです。

Notebooksで、 snowflake.ml.runtime_cluster.scale_cluster Python API を使用してアクティブノード数をセットします。コンピューティングプールのアクティブノード数は、プールの MAX_NODES までの、ワークロードを実行するために使用できるノードの数です。このメソッドは、ヘッドノードとすべてのワーカーノードを含む、必要なアクティブノードの総数を主要パラメーターとします。

注釈

この関数はデフォルトでブロッキングされ(つまり、スケーリング操作が終了するまで待機します)、12分間のタイムアウトがあります。操作がタイムアウトした場合は、自動的に以前の状態にロールバックされます。

スケーリング操作はセッションをまたいで持続しません。つまり、Notebooksのワーカーノード数がゼロでない状態で終了した場合、次にNotebooksを起動しても自動的にスケールアップすることはありません。ワーカーノードの数をセットするには、スケーリング API を再度呼び出す必要があります。

構文

snowflake.ml.runtime_cluster.scale_cluster(
    expected_cluster_size: int,
    *,
    notebook_name: Optional[str] = None,
    is_async: bool = False,
    options: Optional[Dict[str, Any]] = None
) -> bool
Copy

引数

  • expected_cluster_size (int): コンピューティングプールのアクティブノード数(プールの MAX_NODES まで)。これにはヘッドノードとすべてのワーカーノードが含まれます。

  • notebook_name (オプション [str]): ワークロードを実行するNotebooksの名前。スケールされるコンピューティングプールは、指定されたNotebooksが実行されているプールです。提供されない場合は、現在のコンテキストから自動的に決定されます。間違ったNotebooks名を使用した場合は例外が発生します。

  • is_async (bool): 関数がスケーリング待ちをブロックするかどうかを制御します。

    • Falseの場合(デフォルト): 関数はクラスタの準備が完全に整うか、操作がタイムアウトするまでブロックします。

    • Trueの場合: この関数は、スケーリングリクエストが受け入れられたことを確認した直後に戻ります。

  • options (オプション [Dict [str, Any]]): 高度な構成オプション:

    • rollback_after_seconds (int): スケーリングが完了しなかった場合に自動ロールバックするまでの最大時間。デフォルトは720秒です。

    • block_until_min_cluster_size (int): 関数が戻る前に準備できていなければならないノードの最小数。

戻り値

True コンピューティングプールが指定されたアクティブノード数まで正常にスケールされた場合。そうでない場合は例外が発生します。

from snowflake.ml.runtime_cluster import scale_cluster

# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)

# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers

# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)

# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Copy

利用可能なノード数の取得

クラスタ内のアクティブノードに関する情報を取得するには、 get_nodes API を使用します。この関数は引数を取りません。

構文

get_nodes() -> list
Copy

戻り値

クラスタ内のアクティブノードの詳細を含むリスト。リストの各要素は、以下のキーを持つ辞書です。

  • name (str): ノードの名前。

  • cpus (int): ノードの CPUs の数。

  • gpus (int): ノードの GPUs の数。

from snowflake.ml.runtime_cluster import get_nodes

# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
Copy

サンプルコードの出力は以下の通りです。

2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]

マルチノードクラスタでの分散トレーニング

The Container Runtime supports distributed training of LightGBM, XGBoost, and PyTorch models. The distributed training APIs for LightGBMEstimator, XGBEstimator, and PyTorch are documented in detail in the API Reference.

スケーリング構成

すべてのモデルは、トレーニングジョブのリソースを指定できるオプションのスケーリング構成パラメーターを提供します。スケーリング構成は、モデル固有のクラスのインスタンスです: モデルのタイプに応じて LightGBMScalingConfigXGBScalingConfig、または PyTorchScalingConfig

LightGBM および XGBoost スケーリング構成オブジェクトには、以下の属性があります。

  • num_workers: トレーニングに使用するワーカープロセスの数。デフォルトは-1で、ワーカープロセスの数を自動的にセットします。

  • num_cpu_per_worker: ワーカープロセスごとに割り当てられた CPUs の数。デフォルトは-1で、ワーカープロセスあたりの CPUs の数を自動的にセットします。

  • use_gpu: GPU をトレーニングに使用するかどうか。デフォルトはNoneで、推定者が環境に応じて選択することができます。GPU を使用する場合は、必ずモデルのパラメーターも GPU を使用するように構成してください。

注釈

Generally, leave num_workers and num_cpu_per_worker at their default values, so Container Services determines the best way to distribute these resources. The runtime assigns a worker for each node in the compute pool, and the necessary CPUs or GPUs for each worker to complete the task.

PyTorch スケーリング構成オブジェクトは以下の属性を持ちます。

  • num_cpus: 各ワーカーに予約する CPU コアの数。

  • num_gpus: 各ワーカーに予約する GPUs の数。デフォルトは0で、 GPUs が予約されていないことを示します。

LightGBM/XGBoost モデルの分散トレーニング

メモリ使用状況

通常、 n GB の RAM を持つノードは、 n/4 から n/3 のデータに対して、メモリ不足になることなくモデルをトレーニングすることができます。最大データセットサイズは、ワーカープロセス数と使用するトレーニングアルゴリズムに依存します。

パフォーマンスの計算

マルチノードトレーニングのパフォーマンスは、ツリーの深さ、ツリーの数、最大ビン数などのモデルパラメーターに依存します。これらのパラメーターの値を大きくすると、データセットの総トレーニング時間が長くなります。

次の例は、マルチノードクラスタ上で XGBoost モデルをトレーニングする方法を示しています。LightGBM モデルのトレーニングも同様です。

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"

# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
    sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
    sql_script += f"select \n"
    for i in range(1, 10):
        sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
    sql_script += f"FT_1 + FT_2 AS TARGET, \n"
    sql_script += f"from TABLE(generator(rowcount=>({10000})));"
    return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""

sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
    sample_train_df, ingestor_class=RayDataIngester
)

xgb_model = estimator.fit(
    data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Copy

PyTorch モデルの分散トレーニング

PyTorch モデルは、各ワーカープロセスで呼び出されるトレーニング関数(train_func)を使ってトレーニングされます。

コンテキスト APIs の使用

トレーニング関数の実行中に、コンテキスト APIs を使用して、トレーニング環境に関する重要なメタデータにアクセスしたり、呼び出し元からトレーニング関数へのパラメーター転送を行ったりすることができます。PyTorch コンテキストクラスのドキュメントは 関連クラス をご覧ください。

コンテキストオブジェクトは、トレーニング関数の動作をカスタマイズするために使用できるランタイムメタデータを公開します。これらは、 get_node_rankget_local_rankget_world_size などの提供されているメソッドを使って取得できます。

次のコードは、コンテキストオブジェクトから値 testtrain を取得する例です。これらは dataset_map というキーで渡されます(このトピックの後のトレーニング関数の例で見ることができます)。これらの値は、 PyTorch データセットオブジェクトを作成するために使用され、その後モデルに渡されます。

dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Copy

メトリックレポート

コンテキストオブジェクトの metrics_reporter メソッドを使用して、トレーニング関数から制御コードにメトリックを送信します。これにより、次の例に示すように、トレーニングプロセスのリアルタイムモニタリングとデバッグが可能になります。

context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Copy

次の例は、 PyTorch モデルのトレーニング関数です。

def train_func():
    import io
    import base64
    import time
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torchvision import transforms
    from torch.utils.data import IterableDataset
    from torch.optim.lr_scheduler import StepLR
    from PIL import Image
    from snowflake.ml.modeling.distributors.pytorch import get_context

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    class DecodedDataset(IterableDataset):
        def __init__(self, source_dataset):
            self.source_dataset = source_dataset
            self.transforms = transforms.ToTensor()  # Ensure we apply ToTensor transform

        def __iter__(self):
            for row in self.source_dataset:
                base64_image = row['IMAGE']
                image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
                # Convert the image to a tensor
                image = self.transforms(image)  # Converts PIL image to tensor

                labels = row['LABEL']
                yield image, int(labels)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        batch_idx = 1
        for data, target in train_loader:
            # print(f"data : {data} \n target: {target}")
            # raise RuntimeError("test")
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
            batch_idx += 1

    context = get_context()
    rank = context.get_local_rank()
    device = f"cuda:{rank}"
    is_distributed = context.get_world_size() > 1
    if is_distributed:
        dist.init_process_group(backend="nccl")
    print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")

    dataset_map = context.get_dataset_map()
    train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
    test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

    batch_size = 64
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )

    model = Net().to(device)
    if is_distributed:
        model = DDP(model)
    optimizer = optim.Adadelta(model.parameters())
    scheduler = StepLR(optimizer, step_size=1)

    hyper_parms = context.get_hyper_params()
    num_epochs = int(hyper_parms['num_epochs'])
    start_time = time.time()
    for epoch in range(num_epochs):
        train(model, device, train_loader, optimizer, epoch+1)
        scheduler.step()
    now = time.time()
    context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
    test(model, device, test_loader, context)
Copy

次のコードは、前述のトレーニング関数が与えられた場合に分散トレーニングを開始する方法を示しています。この例では、複数のノードでトレーニングを実行するために、 PyTorch ディストリビュータオブジェクトを作成し、トレーニングデータとテストデータをコンテキストオブジェクトを介してトレーニング関数に接続し、トレーナーを実行する前にスケーリング構成を確立します。

# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector

df = session.table("MNIST_60K")

train_df, test_df = df.random_split([0.99, 0.01], 0)

# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(  # scaling configuration
        num_nodes=2,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    )
)

# Run the trainer.
results = pytorch_trainer.run(  # accepts context values as parameters
    dataset_map={"train": train_data, "test": test_data},
    hyper_params={"num_epochs": "1"}
)
Copy

既知の制限と一般的な問題

These limitations and issues are likely to be addressed before multi-node training on Container Runtime is generally available.

スケーリング操作のタイムアウト

新しいノードが12分のタイムアウト内に準備できないため、スケーリング操作が失敗することがあります。考えられる原因は以下の通りです。

  • プール容量の不足。 プールの MAX_NODES を超えるノードをリクエストしています。プールの MAX_NODES を増やしてください。

  • リソースの競合。 12分では追加されたノードを暖めるのに十分な時間ではないかもしれません。プールの MIN_NODES をより大きな数値にセットして一部のノードをウォームアップしておくか、 scale_cluster を複数回呼び出すことでアクティブノードの数を増やし、インクリメントを小さくします。もう一つの選択肢は、非同期モードを使用して、すべてのノードの準備が整うのを待つのをスキップすることです。

    • ノンブロッキング操作には非同期モードを使用します。

    scale_cluster(3, is_async=True)
    
    Copy
    • タイムアウトのしきい値を上げてください。

    scale_cluster(3, options={"rollback_after_seconds": 1200})
    
    Copy

Notebooks名エラー

「Notebooks<名前>が存在しないか、承認されていません」のようなエラーメッセージが表示された場合は、自動検出されたNotebooks名が現在のNotebooksと一致しないことを意味します。これは次のような場合に起こります。

  • Notebooks名に、ドットやスペースなどの特殊文字が含まれている

  • Notebooks名の自動検出が正しく動作していない

解決策: Notebooks名のパラメーターを明示的に指定します。Notebooks名を 識別子 として扱うには二重引用符が必要であることに注意してください。

# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
    scale_cluster(2)
except Exception as e:
    print(e)  # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
    scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Copy

失敗したスケーリング操作の後、SPCS サービスがクリーンアップされません

スケーリング操作が失敗した場合、システムはその操作で作成されたすべてのリソースをクリーンアップする必要があります。しかし、これに失敗すると、1つ以上の SPCS サービスが PENDING または FAILED の状態のままになる可能性があります。PENDING 状態にあるサービスは後で ACTIVE になる可能性があり、コンピューティングプールに容量がない場合は永久に PENDING 状態のままになります。

PENDING または FAILED の状態のサービスを削除するには、クラスタを1ノード(ワーカーノードはゼロ)にスケールします。起動したすべてのサービスをクリーンアップするには、Notebooksインターフェイスの「セッションを終了」をクリックして、現在のNotebooksセッションを終了します。