Container Runtime pour ML on multi-node clusters¶
Dans cet avant-première, Container Runtime pour ML vous permet d’exécuter des charges de travail ML sur des clusters à plusieurs nœuds dans Snowflake Notebooks. La bibliothèque snowflake-ml-python
inclut des APIs pour définir le nombre de nœuds dans le pool de calcul disponible pour les charges de travail ML, ce qui permet de faire évoluer les ressources disponibles pour une charge de travail sans redimensionner le pool de calcul. Une autre API permet de récupérer la liste des nœuds actifs.
Un cluster à plusieurs nœuds désigne un nœud comme nœud principal. Les nœuds supplémentaires sont appelés nœuds de travail. Le nœud principal orchestre les opérations parallèles dans le cluster et contribue également à l’exécution de la charge de travail grâce à ses ressources de calcul. Un cluster à plusieurs nœuds avec un nœud actif n’a qu’un nœud principal. Un cluster à plusieurs nœuds avec trois nœuds actifs comporte un nœud principal et deux nœuds de travail, et les trois nœuds participent à l’exécution de votre charge de travail.
Conditions préalables¶
Pour utiliser des clusters à plusieurs nœuds afin d’exécuter vos charges de travail ML, vous avez besoin de ce qui suit :
Un compte Snowflake actif avec accès aux Notebooks. Voir Snowflake Notebooks.
Des privilèges permettant de créer et de gérer des Notebooks qui utilisent Container Runtime. Voir Notebooks sur Container Runtime pour ML.
Configurer un pool de calcul¶
Pour utiliser une configuration à plusieurs nœuds, vous devez disposer d’un pool de calcul comprenant au moins deux nœuds. Vous pouvez soit créer un nouveau pool de calcul ou en modifier un existant. Dans l’une ou l’autre de ces commandes, transférez l’argument MAX_NODES pour définir la capacité maximale du pool. Il est conseillé de prévoir un ou plusieurs nœuds supplémentaires afin de pouvoir facilement augmenter ou réduire la charge de travail.
Pour connaître la capacité d’un pool ce calcul, utilisez la commande DESCRIBE COMPUTE POOL. La capacité se trouve dans la colonne MAX_NODES de la table renvoyée.
DESCRIBE COMPUTE POOL my_pool;
Pour définir la capacité d’un pool de calcul, utilisez la commande ALTER COMPUTE POOL.
ALTER COMPUTE POOL <compute_pool_name>
SET MAX_NODES = <total_capacity>;
Exécution d’une charge de travail sur un cluster à plusieurs nœuds¶
Le choix d’un pool de calcul à plusieurs nœuds pour votre Notebook est la seule action requise pour utiliser plusieurs nœuds du pool de calcul afin d’exécuter une charge de travail ML.
Dans le Notebook, définissez le nombre de nœuds actifs à l’aide de snowflake.ml.runtime_cluster.scale_cluster
l’API Python. Le nombre de nœuds actifs dans un pool de calcul est le nombre de nœuds disponibles pour exécuter une charge de travail, jusqu’à la limite MAX_NODES du pool. La méthode prend comme paramètre principal le nombre total de nœuds actifs requis, y compris le nœud principal et tous les nœuds de travail.
Note
Cette fonction est bloquée par défaut (c’est-à-dire qu’elle attend la fin de l’opération de mise à l’échelle) et dispose d’un délai d’expiration de 12 minutes. Si l’opération est interrompue, elle revient automatiquement à son état antérieur.
Les opérations de mise à l’échelle ne persistent pas d’une session à l’autre. En d’autres termes, si un notebook se termine avec un nombre non nul de nœuds de travail, il ne sera pas automatiquement mis à l’échelle lors du prochain démarrage du notebook. Vous devez appeler à nouveau l’API de mise à l’échelle pour définir le nombre de nœuds de travail.
Syntaxe¶
snowflake.ml.runtime_cluster.scale_cluster(
expected_cluster_size: int,
*,
notebook_name: Optional[str] = None,
is_async: bool = False,
options: Optional[Dict[str, Any]] = None
) -> bool
Arguments¶
expected_cluster_size
(int) : Le nombre de nœuds actifs dans le pool de calcul, jusqu’à la limite MAX_NODES du pool. Cela inclut le nœud principal et tous les nœuds de travail.notebook_name
(Facultatif[str]) : Le nom du notebook où la charge de travail est exécutée. Le pool de calcul à mettre à l’échelle est le pool sur lequel s’exécute le Notebook spécifié. S’il n’est pas fourni, il sera automatiquement déterminé à partir du contexte actuel. Une exception est levée si le nom du notebook utilisé n’est pas le bon.is_async
(bool) : Contrôle si la fonction se bloque en attente de la mise à l’échelle :Si False (par défaut) : La fonction se bloque jusqu’à ce que le cluster soit entièrement prêt ou que l’opération s’arrête.
Si True : la fonction revient immédiatement après la confirmation que la requête de mise à l’échelle a été acceptée.
options
(Facultatif[Dict[str, Any]]) : Options de configuration avancées :rollback_after_seconds
(int) : Temps maximum avant le rétablissement automatique si la mise à l’échelle n’est pas terminée. La valeur par défaut est de 720 secondes.block_until_min_cluster_size
(int) : Nombre minimum de nœuds qui doivent être prêts avant le retour de la fonction.
Renvoie¶
True
si le pool de calcul est mis à l’échelle sur le nombre de nœuds actifs spécifié. Dans le cas contraire, une exception est levée.
Exemple¶
from snowflake.ml.runtime_cluster import scale_cluster
# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)
# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers
# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)
# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Obtenir le nombre de nœuds disponibles¶
Utilisez l’API get_nodes
pour obtenir des informations sur les nœuds actifs du cluster. La fonction ne prend aucun argument.
Syntaxe¶
get_nodes() -> list
Renvoie¶
Une liste contenant les détails des nœuds actifs du cluster. Chaque élément de la liste est un dictionnaire dont les clés sont les suivantes :
name
(str) : Le nom du nœud.cpus
(int) : Le nombre de CPUs sur le nœud.gpus
(int) : Le nombre de GPUs sur le nœud.
Exemple¶
from snowflake.ml.runtime_cluster import get_nodes
# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
La sortie de l’exemple de code est la suivante :
2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]
Entraînement distribué sur des clusters à plusieurs nœuds¶
Container Runtime pour ML prend en charge l’entraînement distribué des modèles LightGBM, XGBoost et PyTorch. Les APIs de l’entraînement distribué pour LightGBMEstimator, XGBEstimator et PyTorch sont documentées en détail dans la référence d”API.
Configuration de la mise à l’échelle¶
Tous les modèles fournissent un paramètre de configuration de mise à l’échelle facultatif qui vous permet de spécifier la ressource pour la tâche d’entraînement. La configuration de mise à l’échelle est une instance d’une classe spécifique au modèle : LightGBMScalingConfig
, XGBScalingConfig
ou PyTorchScalingConfig
selon le type de modèle.
Les objets de configuration de la mise à l’échelle LightGBM et XGBoost ont les attributs suivants :
num_workers
: Nombre de processus de travail à utiliser pour l’entraînement. La valeur par défaut est -1, ce qui définit automatiquement le nombre de processus de travail.num_cpu_per_worker
: Nombre de CPUs alloués par processus de travail. La valeur par défaut est -1, ce qui définit automatiquement le nombre de CPUs par processus de travail.use_gpu
: Détermine s’il faut utiliser le GPU pour l’entraînement. La valeur par défaut est Aucun, ce qui permet à l’estimateur de choisir en fonction de l’environnement. Lorsque vous utilisez le GPU, veillez à configurer également les paramètres du modèle pour utiliser le GPU.
Note
En règle générale, laissez num_workers
et num_cpu_per_worker
sur leurs valeurs par défaut, afin que Container Services pour ML détermine la meilleure façon de distribuer ces ressources. L’environnement d’exécution attribue un travailleur pour chaque nœud dans le pool de calcul, ainsi que les CPUs ou GPUs nécessaires à l’exécution de la tâche par chaque travailleur.
Les objets de configuration de la mise à l’échelle PyTorch ont les attributs suivants :
num_cpus
: Nombre de cœurs de CPU à réserver pour chaque travailleur.num_gpus
: Nombre de GPUs à réserver pour chaque travailleur. La valeur par défaut est 0, ce qui indique qu’aucun GPUs n’est réservé.
Entraînement distribué des modèles LightGBM/XGBoost¶
- Utilisation de la mémoire
En règle générale, un nœud ayant n GB de RAM peut entraîner un modèle sur n/4 à n/3 de données sans manquer de mémoire. La taille maximale de l’ensemble de données dépend du nombre de processus de travail et de l’algorithme d’entraînement utilisé.
- Performance de calcul
La performance de l’entraînement multi-nœuds dépend des paramètres du modèle tels que la profondeur de l’arborescence, le nombre d’arborescences et le nombre maximal d’emplacements. L’augmentation de la valeur de ces paramètres peut accroître la durée totale de l’entraînement sur un ensemble de données.
Exemple¶
L’exemple suivant montre comment entraîner un modèle XGBoost sur un cluster à plusieurs nœuds. L’entraînement des modèles LightGBM est similaire.
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"
# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
sql_script += f"select \n"
for i in range(1, 10):
sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
sql_script += f"FT_1 + FT_2 AS TARGET, \n"
sql_script += f"from TABLE(generator(rowcount=>({10000})));"
return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""
sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)
params = {
"eta": 0.1,
"max_depth": 8,
"min_child_weight": 100,
"tree_method": "hist",
}
scaling_config = XGBScalingConfig(
use_gpu=False
)
estimator = XGBEstimator(
n_estimators=50,
objective="reg:squarederror",
params=params,
scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
sample_train_df, ingestor_class=RayDataIngester
)
xgb_model = estimator.fit(
data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Entraînement distribué des modèles PyTorch¶
Les modèles PyTorch sont formés à l’aide d’une fonction d’entraînement (train_func
) qui est appelée dans chaque processus de travail.
Utiliser les APIs de contexte¶
Pendant l’exécution de la fonction d’entraînement, vous pouvez utiliser les APIs de contexte pour accéder à des métadonnées essentielles sur l’environnement d’entraînement et pour la transmission de paramètres de l’appelant aux fonctions d’entraînement. Voir Classes associées pour la documentation de la classe de contexte PyTorch.
L’objet de contexte expose des métadonnées d’exécution que vous pouvez utiliser pour personnaliser le comportement de la fonction d’entraînement. Vous pouvez les récupérer à l’aide des méthodes fournies get_node_rank
, get_local_rank
, get_world_size
et autres.
Le code suivant est un exemple de récupération des valeurs test
et train
à partir de l’objet de contexte ; celles-ci sont transmises dans une clé appelée dataset_map
(que vous pouvez voir dans l’exemple de la fonction d’entraînement plus loin dans cette rubrique). Ces valeurs sont utilisées pour créer des objets d’ensemble de données PyTorch qui sont ensuite transmis au modèle.
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Rapports sur les métriques¶
Utilisez la méthode
metrics_reporter
de l’objet de contexte pour envoyer les métriques de la fonction d’entraînement au code de contrôle. Cela permet de contrôler et de déboguer en temps réel le processus d’entraînement, comme le montre l’exemple suivant.context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Exemple¶
L’exemple suivant est une fonction d’entraînement pour un modèle PyTorch.
def train_func():
import io
import base64
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import transforms
from torch.utils.data import IterableDataset
from torch.optim.lr_scheduler import StepLR
from PIL import Image
from snowflake.ml.modeling.distributors.pytorch import get_context
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
class DecodedDataset(IterableDataset):
def __init__(self, source_dataset):
self.source_dataset = source_dataset
self.transforms = transforms.ToTensor() # Ensure we apply ToTensor transform
def __iter__(self):
for row in self.source_dataset:
base64_image = row['IMAGE']
image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
# Convert the image to a tensor
image = self.transforms(image) # Converts PIL image to tensor
labels = row['LABEL']
yield image, int(labels)
def train(model, device, train_loader, optimizer, epoch):
model.train()
batch_idx = 1
for data, target in train_loader:
# print(f"data : {data} \n target: {target}")
# raise RuntimeError("test")
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
batch_idx += 1
context = get_context()
rank = context.get_local_rank()
device = f"cuda:{rank}"
is_distributed = context.get_world_size() > 1
if is_distributed:
dist.init_process_group(backend="nccl")
print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
batch_size = 64
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
model = Net().to(device)
if is_distributed:
model = DDP(model)
optimizer = optim.Adadelta(model.parameters())
scheduler = StepLR(optimizer, step_size=1)
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
start_time = time.time()
for epoch in range(num_epochs):
train(model, device, train_loader, optimizer, epoch+1)
scheduler.step()
now = time.time()
context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
test(model, device, test_loader, context)
Le code suivant illustre comment lancer un entraînement distribué à partir de la fonction d’entraînement précédente. L’exemple crée un objet distributeur PyTorch pour exécuter l’entraînement sur plusieurs nœuds, connecte les données d’entraînement et de test à la fonction d’entraînement via un objet de contexte et établit la configuration de mise à l’échelle avant d’exécuter le formateur.
# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector
df = session.table("MNIST_60K")
train_df, test_df = df.random_split([0.99, 0.01], 0)
# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
scaling_config=PyTorchScalingConfig( # scaling configuration
num_nodes=2,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
)
)
# Run the trainer.
results = pytorch_trainer.run( # accepts context values as parameters
dataset_map={"train": train_data, "test": test_data},
hyper_params={"num_epochs": "1"}
)
Limites connues et problèmes courants¶
Ces limites et ces problèmes seront probablement résolus avant que l’entraînement multi-nœuds sur Container Runtime pour ML ne soit disponible.
L’opération de mise à l’échelle expire¶
L’opération de mise à l’échelle peut échouer car les nouveaux nœuds ne sont pas prêts dans le délai d’expiration de 12 minutes. Les causes possibles sont les suivantes :
Capacité du pool insuffisante. Votre requête porte sur un nombre de nœuds supérieur à la capacité MAX_NODES du pool. Augmentez la capacité MAX_NODES du pool.
Contention des ressources. 12 minutes peuvent ne pas être suffisantes pour contenir les nœuds ajoutés. Définissez la capacité MIN_NODES du pool sur un nombre plus élevé pour conserver certains nœuds, ou augmentez le nombre de nœuds actifs en faisant plusieurs appels à
scale_cluster
avec un incrément plus petit. Une autre option consiste à utiliser le mode asynchrone pour éviter d’attendre que tous les nœuds soient prêts :Utilisez le mode asynchrone pour les opérations non bloquantes :
scale_cluster(3, is_async=True)
Augmentez le seuil du délai d’expiration :
scale_cluster(3, options={"rollback_after_seconds": 1200})
Erreurs au niveau du nom du Notebook¶
Si vous voyez un message d’erreur tel que « Le <nom> du notebook n’existe pas ou n’est pas autorisé », cela signifie que le nom du notebook détecté automatiquement ne correspond pas au notebook actuel. Cela peut se produire dans les cas suivants :
Le nom de votre notebook contient des caractères spéciaux tels que des points et des espaces
La détection automatique du nom du notebook ne fonctionne pas correctement
Solution : Fournissez explicitement le paramètre du nom du notebook. Notez que le nom du notebook doit être mis entre guillemets doubles pour être traité comme un identificateur :
# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
scale_cluster(2)
except Exception as e:
print(e) # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Les services SPCS ne sont pas nettoyés après l’échec d’une opération de mise à l’échelle¶
Lorsque les opérations de mise à l’échelle échouent, le système doit nettoyer toutes les ressources créées lors de l’opération. Toutefois, en cas d’échec, un ou plusieurs services SPCS peuvent être laissés àl’état PENDING ou FAILED. Les services à l’état PENDING peuvent devenir ACTIVE ultérieurement ou, s’il n’y a pas de capacité dans le pool de calcul, rester à l’état PENDING pour toujours.
Pour supprimer les services à l’état PENDING ou FAILED, mettez le cluster à l’échelle pour qu’il ait un nœud (zéro nœud de travail). Pour nettoyer tous les services lancés, terminez la session du notebook en cours en cliquant sur « Terminer la session » dans l’interface du notebook.