Verteiltes Training¶
Die Snowflake Container Runtime bietet eine flexible Trainingsumgebung, die Sie verwenden können, um Modelle auf der Infrastruktur von Snowflake zu trainieren. Sie können Open-Source-Pakete oder verteilte Snowflake ML-Trainingsläufe für das Training mit mehreren Knoten und mehreren Geräten verwenden.
Verteilte Trainingsläufe skalieren Ihre Machine-Learning-Workloads automatisch über mehrere Knoten und GPUs. Snowflake-Distributoren können Clusterressourcen auf intelligente Weise verwalten, ohne dass eine komplexe Konfiguration erforderlich ist, sodass verteiltes Training zugänglich und effizient ist.
Verwenden Sie Standard-Open-Source-Bibliotheken, wenn Sie
Mit kleinen Datensets in Ein-Knoten-Umgebungen arbeiten
Prototypen schnell ausführen und mit Modellen experimentieren
Workflows ohne verteilte Anforderungen rehosten
Verwenden Sie verteilte Snowflake-Trainingsläufe für:
Trainieren von Modellen für Datensets, die größer als der Arbeitsspeicher eines einzelnen Serverknotens sind
Effiziente Verwendung mehrerer GPUs
Automatische Nutzung aller MLJobs mit mehreren Compute-Knoten oder skalierten Notebook-Cluster
Verteilte Snowflake-ML-Trainingsläufe¶
Snowflake ML bietet verteilte Trainingsläufe für gängige Frameworks für maschinelles Lernen, einschließlich XGBoost, LightGBMund PyTorch. Diese Trainingsläufe sind für die Ausführung auf der Infrastruktur von Snowflake optimiert und können automatisch über mehrere Knoten und GPUs skaliert werden.
Automatisches Ressourcenmanagement – Snowflake erkennt und verwendet automatisch alle verfügbaren Cluster-Ressourcen
Vereinfachte Einrichtung – Die Container Runtime-Umgebung wird durch einen von Snowflake bereitgestellten Ray-Cluster unterstützt, sodass keine Benutzerkonfiguration erforderlich ist
Nahtlose Snowflake-Integration – Direkte Kompatibilität mit Snowflake-Datenkonnektoren und -Stagingbereichen
Optionale Skalierungskonfigurationen – Fortgeschrittene Benutzende können bei Bedarf eine Feinabstimmung vornehmen
Laden von Daten¶
Sowohl für Open Source- als auch für verteilte Snowflake-Trainingsläufe ist die leistungsfähigste Methode zum Einlesen von Daten der Snowflake Data Connector:
from snowflake.ml.data.data_connector import DataConnector
# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
Trainingsmethoden¶
Open-Source-Training¶
Verwenden Sie standardmäßige Open-Source-Bibliotheken, wenn Sie maximale Flexibilität und Kontrolle über Ihren Trainingsprozess benötigen. Bei Open-Source-Training verwenden Sie direkt beliebte ML-Frameworks wie XGBoost, LightGBMund PyTorch mit minimalen Änderungen und profitieren gleichzeitig von der Infrastruktur und Datenkonnektivität von Snowflake.
Die folgenden Beispiele trainieren ein Modell mit XGBoost und LightGBM.
Zum Trainieren mit Open Source XGBoost konvertieren Sie Daten, nachdem Sie sie mit dem Datenkonnektor geladen haben, in einen pandas-Datenframe und verwenden die XGB-Bibliothek direkt:
import xgboost as xgb
train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()
# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Train and evaluate model
evals_result = {}
model = xgb.train(
params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train'), (deval, 'valid')],
evals_result=evals_result
)
# Access the evaluation results
print(evals_result)
from snowflake.ml.modeling.distributors.lightgbm import LightGBMEstimator, LightGBMScalingConfig
# Training parameters
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9
}
# Automatic scaling (recommended)
estimator = LightGBMEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = LightGBMEstimator(
params=params,
scaling_config=LightGBMScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.feature_importance(importance_type='gain')
Verteiltes Training¶
Die verteilte XGBEstimator-Klasse hat eine ähnliche API mit ein paar wesentlichen Unterschieden:
Die XGBoost-Trainingsparameter werden an den
XGBEstimatorwährend der Klasseninitialisierung über den Parameter „params“ übergeben.Das DataConnector-Objekt kann direkt an die
fit-Funktion des Estimator übergeben werden, zusammen mit den Eingabespalten, die die Features definieren, und der Beschriftungsspalte, die das Ziel definiert.Sie können eine Skalierungskonfiguration angeben, wenn Sie die
XGBEstimator-Klasse instanziieren. Snowflake verwendet jedoch standardmäßig alle verfügbaren Ressourcen.
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Automatic scaling (recommended)
estimator = XGBEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
params=params,
scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
Evaluierung des Modells¶
Modelle können durch Übergabe eines eval_set und mit verbose_eval evaluiert werden, um die Evaluierungsdaten auf der Konsole auszugeben. Zusätzlich kann die Ableitung als zweiter Schritt durchgeführt werden. Der verteilte Estimator bietet der Einfachheit halber eine predict-Methode, aber es werden keine verteilten Ableitungen ausgeführt. Wir empfehlen, das Modell nach dem Training in einen OSS xgboost-Estimator umzuwandeln, um Ableitungen auszuführen und in der Modell-Registry zu protokollieren.
Registrieren des Modells¶
Um das Modell in der Snowflake Modell-Registry zu registrieren, verwenden Sie den Open Source-Booster, der von estimator.get_booster bereitgestellt und von estimator.fit zurückgegeben wurde. Weitere Informationen dazu finden Sie unter XGBoost.
PyTorch¶
Der SnowflakePyTorch Distributor bietet native Unterstützung für parallele Modelle von verteilten Daten auf dem Snowflake-Backend. UmDDP auf Snowflake zu verwenden, nutzen Sie Open Source PyTorch-Module mit einigen Snowflake-spezifischen Änderungen:
Laden Sie Daten mit dem
ShardedDataConnector, um Daten automatisch in der Anzahl von Partitionen zu teilen, die mit derworld_sizedes verteilten Trainingslaufs übereinstimmen. Rufen Sieget_shardinnerhalb eines Snowflake-Trainingskontexts auf, um den Shard abzurufen, der mit diesem Worker-Prozess verknüpft ist.Verwenden Sie innerhalb der Trainingsfunktion das
context-Objekt, um verarbeitungsspezifische Informationen wie Rang, lokaler Rang und die für das Training erforderlichen Daten zu erhalten.Speichern Sie das Modell unter Verwendung des
get_model_dirdes Kontexts, um den Speicherort für das Modell zu finden. Dadurch wird das Modell lokal für das Training mit einzelnen Knoten gespeichert und mit einem Snowflake-Stagingbereich für verteiltes Training synchronisiert. Wenn kein Speicherort für den Stagingbereich angegeben ist, wird standardmäßig Ihr Benutzer-Stagingbereich verwendet.
Laden von Daten¶
# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")
data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
Trainieren des Modells¶
# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
# Define a simple neural network
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Define the training function
def train_func():
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from snowflake.ml.modeling.distributors.pytorch import get_context
# Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
context = get_context()
rank = context.get_rank()
dist.init_process_group(backend='gloo')
device = torch.device(f"cuda:{context.get_local_rank()}"
if torch.cuda.is_available() else "cpu")
# Initialize model, loss function, and optimizer
model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
model = DDP(model)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Retrieve training data
dataset_map = context.get_dataset_map()
torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
dataloader = DataLoader(torch_dataset)
# Training loop
for epoch in range(10):
for batch_dict in dataloader:
features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
labels = batch_dict[label_col].T.squeeze(0).float().to(device)
output = model(features)
loss = criterion(output, labels.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')
# Save the model to the model directory provided by the context
if context.get_rank() == 0:
torch.save(
model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
)
# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
# Optional Scaling Configuration, for single node multi-GPU training.
scaling_config=PyTorchScalingConfig(
num_nodes=1,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
)
)
# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
Abrufen des Modells¶
Wenn Sie DDP mit mehreren Knoten verwenden, wird das Modell automatisch mit einem Snowflake-Stagingbereich als gemeinsam genutzter persistenter Speicher synchronisiert.
Der folgende Code ruft das Modell aus einem Stagingbereich ab. Er verwendet den artifact_stage_location-Parameter, um den Speicherort des Stagingbereichs anzugeben, in dem das Modellartefakt gespeichert wird.
Die in der stage_location-Variable gespeicherte Funktion erhält den Speicherort des Modells im Stagingbereich, nachdem das Training abgeschlossen ist. Das Modellartefakt wird unter "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}" gespeichert.
response = pytorch_trainer.run(
dataset_map={'train': data_connector},
artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
)
stage_location = response.get_model_dir()