Container Runtime pour ML on multi-node clusters

Dans cet avant-première, Container Runtime pour ML vous permet d’exécuter des charges de travail ML sur des clusters à plusieurs nœuds dans Snowflake Notebooks. La bibliothèque snowflake-ml-python inclut des APIs pour définir le nombre de nœuds dans le pool de calcul disponible pour les charges de travail ML, ce qui permet de faire évoluer les ressources disponibles pour une charge de travail sans redimensionner le pool de calcul. Une autre API permet de récupérer la liste des nœuds actifs.

Un cluster à plusieurs nœuds désigne un nœud comme nœud principal. Les nœuds supplémentaires sont appelés nœuds de travail. Le nœud principal orchestre les opérations parallèles dans le cluster et contribue également à l’exécution de la charge de travail grâce à ses ressources de calcul. Un cluster à plusieurs nœuds avec un nœud actif n’a qu’un nœud principal. Un cluster à plusieurs nœuds avec trois nœuds actifs comporte un nœud principal et deux nœuds de travail, et les trois nœuds participent à l’exécution de votre charge de travail.

Conditions préalables

Pour utiliser des clusters à plusieurs nœuds afin d’exécuter vos charges de travail ML, vous avez besoin de ce qui suit :

Configurer un pool de calcul

Pour utiliser une configuration à plusieurs nœuds, vous devez disposer d’un pool de calcul comprenant au moins deux nœuds. Vous pouvez soit créer un nouveau pool de calcul ou en modifier un existant. Dans l’une ou l’autre de ces commandes, transférez l’argument MAX_NODES pour définir la capacité maximale du pool. Il est conseillé de prévoir un ou plusieurs nœuds supplémentaires afin de pouvoir facilement augmenter ou réduire la charge de travail.

Pour connaître la capacité d’un pool ce calcul, utilisez la commande DESCRIBE COMPUTE POOL. La capacité se trouve dans la colonne MAX_NODES de la table renvoyée.

DESCRIBE COMPUTE POOL my_pool;
Copy

Pour définir la capacité d’un pool de calcul, utilisez la commande ALTER COMPUTE POOL.

ALTER COMPUTE POOL <compute_pool_name>
    SET MAX_NODES = <total_capacity>;
Copy

Exécution d’une charge de travail sur un cluster à plusieurs nœuds

Le choix d’un pool de calcul à plusieurs nœuds pour votre Notebook est la seule action requise pour utiliser plusieurs nœuds du pool de calcul afin d’exécuter une charge de travail ML.

Dans le Notebook, définissez le nombre de nœuds actifs à l’aide de snowflake.ml.runtime_cluster.scale_cluster l’API Python. Le nombre de nœuds actifs dans un pool de calcul est le nombre de nœuds disponibles pour exécuter une charge de travail, jusqu’à la limite MAX_NODES du pool. La méthode prend comme paramètre principal le nombre total de nœuds actifs requis, y compris le nœud principal et tous les nœuds de travail.

Note

Cette fonction est bloquée par défaut (c’est-à-dire qu’elle attend la fin de l’opération de mise à l’échelle) et dispose d’un délai d’expiration de 12 minutes. Si l’opération est interrompue, elle revient automatiquement à son état antérieur.

Les opérations de mise à l’échelle ne persistent pas d’une session à l’autre. En d’autres termes, si un notebook se termine avec un nombre non nul de nœuds de travail, il ne sera pas automatiquement mis à l’échelle lors du prochain démarrage du notebook. Vous devez appeler à nouveau l’API de mise à l’échelle pour définir le nombre de nœuds de travail.

Syntaxe

snowflake.ml.runtime_cluster.scale_cluster(
    expected_cluster_size: int,
    *,
    notebook_name: Optional[str] = None,
    is_async: bool = False,
    options: Optional[Dict[str, Any]] = None
) -> bool
Copy

Arguments

  • expected_cluster_size (int) : Le nombre de nœuds actifs dans le pool de calcul, jusqu’à la limite MAX_NODES du pool. Cela inclut le nœud principal et tous les nœuds de travail.

  • notebook_name (Facultatif[str]) : Le nom du notebook où la charge de travail est exécutée. Le pool de calcul à mettre à l’échelle est le pool sur lequel s’exécute le Notebook spécifié. S’il n’est pas fourni, il sera automatiquement déterminé à partir du contexte actuel. Une exception est levée si le nom du notebook utilisé n’est pas le bon.

  • is_async (bool) : Contrôle si la fonction se bloque en attente de la mise à l’échelle :

    • Si False (par défaut) : La fonction se bloque jusqu’à ce que le cluster soit entièrement prêt ou que l’opération s’arrête.

    • Si True : la fonction revient immédiatement après la confirmation que la requête de mise à l’échelle a été acceptée.

  • options (Facultatif[Dict[str, Any]]) : Options de configuration avancées :

    • rollback_after_seconds (int) : Temps maximum avant le rétablissement automatique si la mise à l’échelle n’est pas terminée. La valeur par défaut est de 720 secondes.

    • block_until_min_cluster_size (int) : Nombre minimum de nœuds qui doivent être prêts avant le retour de la fonction.

Renvoie

True si le pool de calcul est mis à l’échelle sur le nombre de nœuds actifs spécifié. Dans le cas contraire, une exception est levée.

Exemple

from snowflake.ml.runtime_cluster import scale_cluster

# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)

# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers

# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)

# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Copy

Obtenir le nombre de nœuds disponibles

Utilisez l’API get_nodes pour obtenir des informations sur les nœuds actifs du cluster. La fonction ne prend aucun argument.

Syntaxe

get_nodes() -> list
Copy

Renvoie

Une liste contenant les détails des nœuds actifs du cluster. Chaque élément de la liste est un dictionnaire dont les clés sont les suivantes :

  • name (str) : Le nom du nœud.

  • cpus (int) : Le nombre de CPUs sur le nœud.

  • gpus (int) : Le nombre de GPUs sur le nœud.

Exemple

from snowflake.ml.runtime_cluster import get_nodes

# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
Copy

La sortie de l’exemple de code est la suivante :

2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]

Entraînement distribué sur des clusters à plusieurs nœuds

Container Runtime pour ML prend en charge l’entraînement distribué des modèles LightGBM, XGBoost et PyTorch. Les APIs de l’entraînement distribué pour LightGBMEstimator, XGBEstimator et PyTorch sont documentées en détail dans la référence d”API.

Configuration de la mise à l’échelle

Tous les modèles fournissent un paramètre de configuration de mise à l’échelle facultatif qui vous permet de spécifier la ressource pour la tâche d’entraînement. La configuration de mise à l’échelle est une instance d’une classe spécifique au modèle : LightGBMScalingConfig, XGBScalingConfig ou PyTorchScalingConfig selon le type de modèle.

Les objets de configuration de la mise à l’échelle LightGBM et XGBoost ont les attributs suivants :

  • num_workers : Nombre de processus de travail à utiliser pour l’entraînement. La valeur par défaut est -1, ce qui définit automatiquement le nombre de processus de travail.

  • num_cpu_per_worker : Nombre de CPUs alloués par processus de travail. La valeur par défaut est -1, ce qui définit automatiquement le nombre de CPUs par processus de travail.

  • use_gpu : Détermine s’il faut utiliser le GPU pour l’entraînement. La valeur par défaut est Aucun, ce qui permet à l’estimateur de choisir en fonction de l’environnement. Lorsque vous utilisez le GPU, veillez à configurer également les paramètres du modèle pour utiliser le GPU.

Note

En règle générale, laissez num_workers et num_cpu_per_worker sur leurs valeurs par défaut, afin que Container Services pour ML détermine la meilleure façon de distribuer ces ressources. L’environnement d’exécution attribue un travailleur pour chaque nœud dans le pool de calcul, ainsi que les CPUs ou GPUs nécessaires à l’exécution de la tâche par chaque travailleur.

Les objets de configuration de la mise à l’échelle PyTorch ont les attributs suivants :

  • num_cpus : Nombre de cœurs de CPU à réserver pour chaque travailleur.

  • num_gpus : Nombre de GPUs à réserver pour chaque travailleur. La valeur par défaut est 0, ce qui indique qu’aucun GPUs n’est réservé.

Entraînement distribué des modèles LightGBM/XGBoost

Utilisation de la mémoire

En règle générale, un nœud ayant n GB de RAM peut entraîner un modèle sur n/4 à n/3 de données sans manquer de mémoire. La taille maximale de l’ensemble de données dépend du nombre de processus de travail et de l’algorithme d’entraînement utilisé.

Performance de calcul

La performance de l’entraînement multi-nœuds dépend des paramètres du modèle tels que la profondeur de l’arborescence, le nombre d’arborescences et le nombre maximal d’emplacements. L’augmentation de la valeur de ces paramètres peut accroître la durée totale de l’entraînement sur un ensemble de données.

Exemple

L’exemple suivant montre comment entraîner un modèle XGBoost sur un cluster à plusieurs nœuds. L’entraînement des modèles LightGBM est similaire.

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"

# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
    sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
    sql_script += f"select \n"
    for i in range(1, 10):
        sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
    sql_script += f"FT_1 + FT_2 AS TARGET, \n"
    sql_script += f"from TABLE(generator(rowcount=>({10000})));"
    return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""

sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
    sample_train_df, ingestor_class=RayDataIngester
)

xgb_model = estimator.fit(
    data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Copy

Entraînement distribué des modèles PyTorch

Les modèles PyTorch sont formés à l’aide d’une fonction d’entraînement (train_func) qui est appelée dans chaque processus de travail.

Utiliser les APIs de contexte

Pendant l’exécution de la fonction d’entraînement, vous pouvez utiliser les APIs de contexte pour accéder à des métadonnées essentielles sur l’environnement d’entraînement et pour la transmission de paramètres de l’appelant aux fonctions d’entraînement. Voir Classes associées pour la documentation de la classe de contexte PyTorch.

L’objet de contexte expose des métadonnées d’exécution que vous pouvez utiliser pour personnaliser le comportement de la fonction d’entraînement. Vous pouvez les récupérer à l’aide des méthodes fournies get_node_rank, get_local_rank, get_world_size et autres.

Le code suivant est un exemple de récupération des valeurs test et train à partir de l’objet de contexte ; celles-ci sont transmises dans une clé appelée dataset_map (que vous pouvez voir dans l’exemple de la fonction d’entraînement plus loin dans cette rubrique). Ces valeurs sont utilisées pour créer des objets d’ensemble de données PyTorch qui sont ensuite transmis au modèle.

dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Copy

Rapports sur les métriques

Utilisez la méthode metrics_reporter de l’objet de contexte pour envoyer les métriques de la fonction d’entraînement au code de contrôle. Cela permet de contrôler et de déboguer en temps réel le processus d’entraînement, comme le montre l’exemple suivant.

context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Copy

Exemple

L’exemple suivant est une fonction d’entraînement pour un modèle PyTorch.

def train_func():
    import io
    import base64
    import time
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torchvision import transforms
    from torch.utils.data import IterableDataset
    from torch.optim.lr_scheduler import StepLR
    from PIL import Image
    from snowflake.ml.modeling.distributors.pytorch import get_context

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    class DecodedDataset(IterableDataset):
        def __init__(self, source_dataset):
            self.source_dataset = source_dataset
            self.transforms = transforms.ToTensor()  # Ensure we apply ToTensor transform

        def __iter__(self):
            for row in self.source_dataset:
                base64_image = row['IMAGE']
                image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
                # Convert the image to a tensor
                image = self.transforms(image)  # Converts PIL image to tensor

                labels = row['LABEL']
                yield image, int(labels)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        batch_idx = 1
        for data, target in train_loader:
            # print(f"data : {data} \n target: {target}")
            # raise RuntimeError("test")
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
            batch_idx += 1

    context = get_context()
    rank = context.get_local_rank()
    device = f"cuda:{rank}"
    is_distributed = context.get_world_size() > 1
    if is_distributed:
        dist.init_process_group(backend="nccl")
    print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")

    dataset_map = context.get_dataset_map()
    train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
    test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

    batch_size = 64
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )

    model = Net().to(device)
    if is_distributed:
        model = DDP(model)
    optimizer = optim.Adadelta(model.parameters())
    scheduler = StepLR(optimizer, step_size=1)

    hyper_parms = context.get_hyper_params()
    num_epochs = int(hyper_parms['num_epochs'])
    start_time = time.time()
    for epoch in range(num_epochs):
        train(model, device, train_loader, optimizer, epoch+1)
        scheduler.step()
    now = time.time()
    context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
    test(model, device, test_loader, context)
Copy

Le code suivant illustre comment lancer un entraînement distribué à partir de la fonction d’entraînement précédente. L’exemple crée un objet distributeur PyTorch pour exécuter l’entraînement sur plusieurs nœuds, connecte les données d’entraînement et de test à la fonction d’entraînement via un objet de contexte et établit la configuration de mise à l’échelle avant d’exécuter le formateur.

# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector

df = session.table("MNIST_60K")

train_df, test_df = df.random_split([0.99, 0.01], 0)

# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(  # scaling configuration
        num_nodes=2,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    )
)

# Run the trainer.
results = pytorch_trainer.run(  # accepts context values as parameters
    dataset_map={"train": train_data, "test": test_data},
    hyper_params={"num_epochs": "1"}
)
Copy

Limites connues et problèmes courants

Ces limites et ces problèmes seront probablement résolus avant que l’entraînement multi-nœuds sur Container Runtime pour ML ne soit disponible.

L’opération de mise à l’échelle expire

L’opération de mise à l’échelle peut échouer car les nouveaux nœuds ne sont pas prêts dans le délai d’expiration de 12 minutes. Les causes possibles sont les suivantes :

  • Capacité du pool insuffisante. Votre requête porte sur un nombre de nœuds supérieur à la capacité MAX_NODES du pool. Augmentez la capacité MAX_NODES du pool.

  • Contention des ressources. 12 minutes peuvent ne pas être suffisantes pour contenir les nœuds ajoutés. Définissez la capacité MIN_NODES du pool sur un nombre plus élevé pour conserver certains nœuds, ou augmentez le nombre de nœuds actifs en faisant plusieurs appels à scale_cluster avec un incrément plus petit. Une autre option consiste à utiliser le mode asynchrone pour éviter d’attendre que tous les nœuds soient prêts :

    • Utilisez le mode asynchrone pour les opérations non bloquantes :

    scale_cluster(3, is_async=True)
    
    Copy
    • Augmentez le seuil du délai d’expiration :

    scale_cluster(3, options={"rollback_after_seconds": 1200})
    
    Copy

Erreurs au niveau du nom du Notebook

Si vous voyez un message d’erreur tel que « Le <nom> du notebook n’existe pas ou n’est pas autorisé », cela signifie que le nom du notebook détecté automatiquement ne correspond pas au notebook actuel. Cela peut se produire dans les cas suivants :

  • Le nom de votre notebook contient des caractères spéciaux tels que des points et des espaces

  • La détection automatique du nom du notebook ne fonctionne pas correctement

Solution : Fournissez explicitement le paramètre du nom du notebook. Notez que le nom du notebook doit être mis entre guillemets doubles pour être traité comme un identificateur :

# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
    scale_cluster(2)
except Exception as e:
    print(e)  # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
    scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Copy

Les services SPCS ne sont pas nettoyés après l’échec d’une opération de mise à l’échelle

Lorsque les opérations de mise à l’échelle échouent, le système doit nettoyer toutes les ressources créées lors de l’opération. Toutefois, en cas d’échec, un ou plusieurs services SPCS peuvent être laissés àl’état PENDING ou FAILED. Les services à l’état PENDING peuvent devenir ACTIVE ultérieurement ou, s’il n’y a pas de capacité dans le pool de calcul, rester à l’état PENDING pour toujours.

Pour supprimer les services à l’état PENDING ou FAILED, mettez le cluster à l’échelle pour qu’il ait un nœud (zéro nœud de travail). Pour nettoyer tous les services lancés, terminez la session du notebook en cours en cliquant sur « Terminer la session » dans l’interface du notebook.