Container Runtime für ML auf Mehrknoten-Clustern

In dieser Vorschau ermöglicht Ihnen Container Runtime für ML die Ausführung von ML-Workloads auf Multi-Node-Clustern in Snowflake Notebooks. Die Bibliothek snowflake-ml-python enthält APIs, um die Anzahl der Knoten im Computepool festzulegen, die für ML-Workloads zur Verfügung stehen, so dass die für einen Workload verfügbaren Ressourcen skaliert werden können, ohne die Größe des Compute-Pools zu ändern. Eine weitere API ruft eine Liste der aktiven Knoten ab.

Bei einem Cluster mit mehreren Knoten wird ein Knoten zum Hauptknoten. Zusätzliche Knoten werden als Workerknoten bezeichnet. Der Hauptknoten koordiniert die parallelen Operationen im Cluster und steuert auch seine Rechenressourcen zur Ausführung des Workloads bei. Ein Cluster mit mehreren Knoten und einem aktiven Knoten hat nur einen Hauptknoten. Ein Multiknoten-Cluster mit drei aktiven Knoten hat einen Hauptknoten und zwei Workerknoten, und alle drei Knoten nehmen an der Ausführung Ihrer Arbeitslast teil.

Voraussetzungen

Um Clustering mit mehreren Knoten für die Ausführung Ihrer ML-Workloads zu verwenden, benötigen Sie Folgendes:

Einen Computepool konfigurieren

Um eine Einrichtung mit mehreren Knoten zu verwenden, benötigen Sie einen Computepool mit mindestens zwei Knoten. Sie können entweder einen neuen Computepool erstellen oder einen bestehenden ändern. In beiden Befehlen übergeben Sie das Argument MAX_NODES, um die maximale Kapazität des Pools festzulegen. Es wird empfohlen, einen oder mehrere zusätzliche Knoten bereitzustellen, so dass Sie bei größeren oder kleineren Workloads einfach nach oben oder unten skalieren können.

Um die Kapazität eines Computepools zu sehen, verwenden Sie den Befehl DESCRIBE COMPUTE POOL. Die Kapazität steht in der Spalte MAX_NODES der zurückgegebenen Tabelle.

DESCRIBE COMPUTE POOL my_pool;
Copy

Um die Kapazität eines Computepools festzulegen, verwenden Sie den Befehl ALTER COMPUTE POOL.

ALTER COMPUTE POOL <compute_pool_name>
    SET MAX_NODES = <total_capacity>;
Copy

Workload auf einem Multi-Node-Cluster ausführen

Die Auswahl eines Multi-Node-Computepools für Ihr Notebook ist die einzige Maßnahme, die erforderlich ist, um mehrere Knoten im Computepool zur Ausführung einer ML-Workload zu verwenden.

Legen Sie im Notebook die Anzahl der aktiven Knoten mit der snowflake.ml.runtime_cluster.scale_cluster Python-API fest. Die Anzahl der aktiven Knoten in einem Computepool ist die Anzahl der Knoten, die für die Ausführung einer Workload zur Verfügung stehen, und zwar bis zum Wert MAX_NODES des Pools. Die Methode nimmt die Gesamtzahl der benötigten aktiven Knoten, einschließlich des Hauptknotens und aller Workerknotens, als primären Parameter.

Bemerkung

Diese Funktion ist standardmäßig blockierend (d. h. sie wartet, bis der Skalierungsvorgang beendet ist) und hat eine 12-minütige Zeitüberschreitung. Wenn die Operation aufgrund einer Zeitüberschreitung nicht abgeschlossen werden kann, wird sie automatisch auf den vorherigen Zustand zurückgesetzt.

Skalierungsvorgänge bleiben nicht über Sitzungen hinweg bestehen. Das heißt, wenn ein Notebook mit einer Anzahl von Workerknoten ungleich Null endet, wird es beim nächsten Start des Notebooks nicht automatisch hochskaliert. Sie müssen die Skalierungs-API erneut aufrufen, um die Anzahl der Workerknoten festzulegen.

Syntax

snowflake.ml.runtime_cluster.scale_cluster(
    expected_cluster_size: int,
    *,
    notebook_name: Optional[str] = None,
    is_async: bool = False,
    options: Optional[Dict[str, Any]] = None
) -> bool
Copy

Argumente

  • expected_cluster_size (int): Die Anzahl der aktiven Knoten im Computepool, bis zum Wert von MAX_NODES des Pools. Dazu gehören der Hauptknoten und alle Workerknoten.

  • notebook_name (Optional[str]): Der Name des Notebooks, auf dem die Workload ausgeführt wird. Der zu skalierende Computepool ist der Pool, auf dem das angegebene Notebook läuft. Wenn er nicht angegeben wird, wird er automatisch aus dem aktuellen Kontext ermittelt. Es wird eine Ausnahme ausgelöst, wenn der falsche Notebookname verwendet wird.

  • is_async (bool): Steuert, ob die Funktion das Warten auf die Skalierung blockiert:

    • Wenn „False“ (Standard): Die Funktion blockiert, bis der Cluster vollständig bereit ist oder eine Zeitüberschreitung der Operation entsteht.

    • Wenn „True“: Die Funktion kehrt sofort zurück, nachdem sie bestätigt hat, dass die Skalierungsanfrage akzeptiert wurde.

  • options (Optional[Dict[str, Any]]): Erweiterte Konfigurationsoptionen:

    • rollback_after_seconds (int): Maximale Zeit bis zum automatischen Rollback, wenn die Skalierung nicht abgeschlossen ist. Der Standardwert ist 720 Sekunden.

    • block_until_min_cluster_size (int): Mindestanzahl von Knoten, die bereit sein müssen, bevor die Funktion zurückkehrt.

Rückgabewerte

True, wenn der Computepool erfolgreich auf die angegebene Anzahl aktiver Knoten skaliert wurde. Andernfalls wird eine Ausnahme ausgelöst.

Beispiel

from snowflake.ml.runtime_cluster import scale_cluster

# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)

# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers

# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)

# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Copy

Die verfügbare Anzahl von Knoten ermitteln

Verwenden Sie die get_nodes-API, um Informationen über die aktiven Knoten im Cluster zu erhalten. Die Funktion benötigt keine Argumente.

Syntax

get_nodes() -> list
Copy

Rückgabewerte

Eine Liste mit Details zu den aktiven Knoten im Cluster. Jedes Element der Liste ist ein Wörterbuch mit den folgenden Schlüsseln:

  • name (str): Der Name des Knotens.

  • cpus (int): Die Anzahl von CPUs auf dem Knoten.

  • gpus (int): Die Anzahl von GPUs auf dem Knoten.

Beispiel

from snowflake.ml.runtime_cluster import get_nodes

# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
Copy

Die Ausgabe des Beispielcodes lautet wie folgt:

2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]

Verteiltes Training auf Clustern mit mehreren Knoten

Die Container Runtime für ML unterstützt das verteilte Training von LightGBM-, XGBoost- und PyTorch-Modellen. Die APIs für verteiltes Training für LightGBMEstimator, XGBEstimator und PyTorch ist in der API-Referenz ausführlich dokumentiert.

Konfiguration der Skalierung

Alle Modelle bieten einen optionalen Konfigurationsparameter für die Skalierung, mit dem Sie die Ressource für den Trainingsauftrag angeben können. Die Skalierungskonfiguration ist eine Instanz einer modellspezifischen Klasse: LightGBMScalingConfig, XGBScalingConfig, oder PyTorchScalingConfig, je nach Modelltyp.

LightGBM- und XGBoost-Skalierungskonfigurationsobjekte haben die folgenden Attribute:

  • num_workers: Die Anzahl der Arbeitsprozesse, die für das Training verwendet werden sollen. Der Standardwert ist -1, wodurch die Anzahl der Arbeitsprozesse automatisch festgelegt wird.

  • num_cpu_per_worker: Anzahl der CPUs pro Arbeitsprozess zugewiesen. Der Standardwert ist -1, wodurch die Anzahl der CPUs pro Arbeitsprozess automatisch festgelegt wird.

  • use_gpu: Ob Sie die GPU für das Training verwenden möchten. Die Voreinstellung ist „None“, so dass der Schätzer je nach Umgebung wählen kann. Wenn Sie GPU verwenden, stellen Sie sicher, dass Sie auch die Modellparameter für die Verwendung von GPU konfigurieren.

Bemerkung

Im Allgemeinen sollten Sie num_workers und num_cpu_per_worker auf ihren Standardwerten belassen, damit die Containerdienste für ML den besten Weg zur Verteilung dieser Ressourcen bestimmen. Die Laufzeit weist jedem Knoten im Compute-Pool einen Worker und die für die Erledigung der Aufgabe erforderlichen CPUs oder GPUs zu.

PyTorch-Skalierungskonfigurationsobjekte haben die folgenden Attribute:

  • num_cpus: Die Anzahl der CPU-Kerne, die für jeden Worker reserviert werden sollen.

  • num_gpus: Die Anzahl der GPUs, die für jeden Worker reserviert werden soll. Der Standardwert ist 0, was bedeutet, dass keine GPUs reserviert sind.

Verteiltes Training von LightGBM-/XGBoost-Modellen

Speicherverbrauch

Normalerweise kann ein Knoten mit n GB von RAM ein Modell auf n/4 bis n/3 Daten trainieren, ohne dass der Speicher knapp wird. Die maximale Größe des Datensatzes hängt von der Anzahl der Worker-Prozese und dem verwendeten Trainingsalgorithmus ab.

Rechenleistung

Die Leistung des Trainings mit mehreren Knoten hängt von Modellparametern wie der Baumtiefe, der Anzahl der Bäume und der maximalen Anzahl von Bins ab. Die Erhöhung dieser Parameterwerte kann die Gesamttrainingszeit für einen Datensatz erhöhen.

Beispiel

Das folgende Beispiel zeigt, wie Sie ein XGBoost-Modell auf einem Mehrknoten-Cluster trainieren. Das Training der LightGBM-Modelle ist ähnlich.

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"

# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
    sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
    sql_script += f"select \n"
    for i in range(1, 10):
        sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
    sql_script += f"FT_1 + FT_2 AS TARGET, \n"
    sql_script += f"from TABLE(generator(rowcount=>({10000})));"
    return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""

sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
    sample_train_df, ingestor_class=RayDataIngester
)

xgb_model = estimator.fit(
    data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Copy

Verteiltes Training von PyTorch-Modellen

PyTorch-Modelle werden mit einer Trainingsfunktion (train_func) trainiert, die in jedem Worker-Prozess aufgerufen wird.

Verwendung der Kontext-APIs

Während der Ausführung der Trainingsfunktion können Sie die Kontext-APIs verwenden, um auf wichtige Metadaten über die Trainingsumgebung zuzugreifen und um Parameter vom Caller an die Trainingsfunktionen weiterzuleiten. Siehe Verwandte Klassen für die Dokumentation der PyTorch-Kontextklasse.

Das Kontextobjekt gibt Laufzeit-Metadaten preis, die Sie verwenden können, um das Verhalten der Trainingsfunktion anzupassen. Sie können diese mit den bereitgestellten Methoden get_node_rank, get_local_rank, get_world_size, und anderen abrufen.

Der folgende Code ist ein Beispiel für das Abrufen der Werte test und train aus dem Kontextobjekt; diese werden in einem Schlüssel namens dataset_map übergeben (den Sie im Beispiel der Trainingsfunktion weiter unten in diesem Thema sehen können). Diese Werte werden verwendet, um PyTorch-Datensatzobjekte zu erstellen, die dann an das Modell übergeben werden.

dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Copy

Metriken-Berichterstattung

Verwenden Sie die Methode metrics_reporter des Kontextobjekts, um Metriken von der Trainingsfunktion an den Kontrollcode zu senden. Dies ermöglicht die Überwachung und Fehlerbehebung des Trainingsprozesses in Echtzeit, wie im folgenden Beispiel gezeigt.

context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Copy

Beispiel

Das folgende Beispiel ist eine Trainingsfunktion für ein PyTorch-Modell.

def train_func():
    import io
    import base64
    import time
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torchvision import transforms
    from torch.utils.data import IterableDataset
    from torch.optim.lr_scheduler import StepLR
    from PIL import Image
    from snowflake.ml.modeling.distributors.pytorch import get_context

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    class DecodedDataset(IterableDataset):
        def __init__(self, source_dataset):
            self.source_dataset = source_dataset
            self.transforms = transforms.ToTensor()  # Ensure we apply ToTensor transform

        def __iter__(self):
            for row in self.source_dataset:
                base64_image = row['IMAGE']
                image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
                # Convert the image to a tensor
                image = self.transforms(image)  # Converts PIL image to tensor

                labels = row['LABEL']
                yield image, int(labels)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        batch_idx = 1
        for data, target in train_loader:
            # print(f"data : {data} \n target: {target}")
            # raise RuntimeError("test")
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
            batch_idx += 1

    context = get_context()
    rank = context.get_local_rank()
    device = f"cuda:{rank}"
    is_distributed = context.get_world_size() > 1
    if is_distributed:
        dist.init_process_group(backend="nccl")
    print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")

    dataset_map = context.get_dataset_map()
    train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
    test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

    batch_size = 64
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )

    model = Net().to(device)
    if is_distributed:
        model = DDP(model)
    optimizer = optim.Adadelta(model.parameters())
    scheduler = StepLR(optimizer, step_size=1)

    hyper_parms = context.get_hyper_params()
    num_epochs = int(hyper_parms['num_epochs'])
    start_time = time.time()
    for epoch in range(num_epochs):
        train(model, device, train_loader, optimizer, epoch+1)
        scheduler.step()
    now = time.time()
    context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
    test(model, device, test_loader, context)
Copy

Der folgende Code veranschaulicht, wie das verteilte Training mit der vorangegangenen Trainingsfunktion gestartet werden kann. Das Beispiel erstellt ein PyTorch-Verteilerobjekt, um das Training auf mehreren Knoten auszuführen, verbindet die Trainings- und Testdaten über ein Kontextobjekt mit der Trainingsfunktion und legt die Skalierungskonfiguration fest, bevor das Training ausgeführt wird.

# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector

df = session.table("MNIST_60K")

train_df, test_df = df.random_split([0.99, 0.01], 0)

# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(  # scaling configuration
        num_nodes=2,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    )
)

# Run the trainer.
results = pytorch_trainer.run(  # accepts context values as parameters
    dataset_map={"train": train_data, "test": test_data},
    hyper_params={"num_epochs": "1"}
)
Copy

Bekannte Einschränkungen und häufige Probleme

Diese Einschränkungen und Probleme müssen wahrscheinlich behoben werden, bevor das Multi-Node-Training auf Container Runtime für ML allgemein verfügbar ist.

Zeitüberschreitung bei der Skalierung

Der Skalierungsvorgang kann fehlschlagen, weil die neuen Knoten nicht innerhalb der 12-minütigen Zeitüberschreitung bereit sind. Mögliche Ursachen sind:

  • Unzureichende Pool-Kapazität. Sie haben mehr Knoten angefordert als MAX_NODES des Pools. Erhöhen Sie den Wert von MAX_NODES des Pools.

  • Ressourcenkonflikt. 12 Minuten sind möglicherweise nicht genug Zeit, um die hinzugefügten Knoten aufzuwärmen. Setzen Sie die MIN_NODES des Pools auf eine größere Zahl, um einige der Knoten warm zu halten, oder erhöhen Sie die Anzahl der aktiven Knoten durch mehr als einen Aufruf von scale_cluster mit einer kleineren Schrittweite. Eine andere Möglichkeit ist die Verwendung des asynchronen Modus, damit Sie nicht warten müssen, bis alle Knoten bereit sind:

    • Verwenden Sie den asynchronen Modus für nicht-blockierende Operationen:

    scale_cluster(3, is_async=True)
    
    Copy
    • Erhöhen Sie die Timeout-Schwelle:

    scale_cluster(3, options={"rollback_after_seconds": 1200})
    
    Copy

Notebooknamen-Fehler

Wenn Sie eine Fehlermeldung wie „Notebook <name> existiert nicht oder ist nicht autorisiert“ sehen, bedeutet dies, dass der automatisch erkannte Notebookname nicht mit dem aktuellen Notebook übereinstimmt. Dies kann in den folgenden Fällen passieren:

  • Der Name Ihres Notebooks enthält Sonderzeichen wie Punkte und Leerzeichen

  • Die automatische Erkennung des Notebook-Namens funktioniert nicht richtig

Lösung: Geben Sie explizit den Parameter für den Notebooknamen an. Beachten Sie, dass der Notebookname in Anführungszeichen gesetzt werden muss, damit er als Bezeichner behandelt wird:

# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
    scale_cluster(2)
except Exception as e:
    print(e)  # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
    scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Copy

SPCS-Dienste werden nach einem fehlgeschlagenen Skalierungsvorgang nicht bereinigt

Wenn Skalierungsvorgänge fehlschlagen, sollte das System alle Ressourcen, die bei dem Vorgang erstellt wurden, bereinigen. Wenn dies jedoch fehlschlägt, kann es sein, dass einer oder mehrere SPCS-Dienste im Zustand PENDING oder FAILED verbleiben. Dienste, die sich im Status PENDING befinden, können später zu ACTIVE werden, oder, wenn keine Kapazität im Compute-Pool vorhanden ist, für immer PENDING bleiben.

Um Dienste im Status PENDING oder FAILED zu entfernen, skalieren Sie den Cluster auf einen Knoten (null Arbeitsknoten). Um alle gestarteten Dienste zu bereinigen, beenden Sie die aktuelle Notebook-Sitzung, indem Sie in der Notebook-Oberfläche auf „End Session“ klicken.