Verteiltes Training

Die Snowflake Container Runtime bietet eine flexible Trainingsumgebung, die Sie verwenden können, um Modelle auf der Infrastruktur von Snowflake zu trainieren. Sie können Open-Source-Pakete oder verteilte Snowflake ML-Trainingsläufe für das Training mit mehreren Knoten und mehreren Geräten verwenden.

Verteilte Trainingsläufe skalieren Ihre Machine-Learning-Workloads automatisch über mehrere Knoten und GPUs. Snowflake-Distributoren können Clusterressourcen auf intelligente Weise verwalten, ohne dass eine komplexe Konfiguration erforderlich ist, sodass verteiltes Training zugänglich und effizient ist.

Verwenden Sie Standard-Open-Source-Bibliotheken, wenn Sie

  • Mit kleinen Datensets in Ein-Knoten-Umgebungen arbeiten

  • Prototypen schnell ausführen und mit Modellen experimentieren

  • Workflows ohne verteilte Anforderungen rehosten

Verwenden Sie verteilte Snowflake-Trainingsläufe für:

  • Trainieren von Modellen für Datensets, die größer als der Arbeitsspeicher eines einzelnen Serverknotens sind

  • Effiziente Verwendung mehrerer GPUs

  • Automatische Nutzung aller MLJobs mit mehreren Compute-Knoten oder skalierten Notebook-Cluster

Verteilte Snowflake-ML-Trainingsläufe

Snowflake ML bietet verteilte Trainingsläufe für gängige Frameworks für maschinelles Lernen, einschließlich XGBoost, LightGBMund PyTorch. Diese Trainingsläufe sind für die Ausführung auf der Infrastruktur von Snowflake optimiert und können automatisch über mehrere Knoten und GPUs skaliert werden.

  • Automatisches Ressourcenmanagement – Snowflake erkennt und verwendet automatisch alle verfügbaren Cluster-Ressourcen

  • Vereinfachte Einrichtung – Die Container Runtime-Umgebung wird durch einen von Snowflake bereitgestellten Ray-Cluster unterstützt, sodass keine Benutzerkonfiguration erforderlich ist

  • Nahtlose Snowflake-Integration – Direkte Kompatibilität mit Snowflake-Datenkonnektoren und -Stagingbereichen

  • Optionale Skalierungskonfigurationen – Fortgeschrittene Benutzende können bei Bedarf eine Feinabstimmung vornehmen

Laden von Daten

Sowohl für Open Source- als auch für verteilte Snowflake-Trainingsläufe ist die leistungsfähigste Methode zum Einlesen von Daten der Snowflake Data Connector:

from snowflake.ml.data.data_connector import DataConnector

# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
Copy

Trainingsmethoden

Open-Source-Training

Verwenden Sie standardmäßige Open-Source-Bibliotheken, wenn Sie maximale Flexibilität und Kontrolle über Ihren Trainingsprozess benötigen. Bei Open-Source-Training verwenden Sie direkt beliebte ML-Frameworks wie XGBoost, LightGBMund PyTorch mit minimalen Änderungen und profitieren gleichzeitig von der Infrastruktur und Datenkonnektivität von Snowflake.

Die folgenden Beispiele trainieren ein Modell mit XGBoost und LightGBM.

Zum Trainieren mit Open Source XGBoost konvertieren Sie Daten, nachdem Sie sie mit dem Datenkonnektor geladen haben, in einen pandas-Datenframe und verwenden die XGB-Bibliothek direkt:

import xgboost as xgb

train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()

# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)

# Training parameters
params = {
   'objective': 'reg:squarederror',
   'max_depth': 6,
   'learning_rate': 0.1
}

# Train and evaluate model
evals_result = {}
model = xgb.train(
   params,
   dtrain,
   num_boost_round=100,
   evals=[(dtrain, 'train'), (deval, 'valid')],
   evals_result=evals_result
)

# Access the evaluation results
print(evals_result)
Copy

Verteiltes Training

Die verteilte XGBEstimator-Klasse hat eine ähnliche API mit ein paar wesentlichen Unterschieden:

  • Die XGBoost-Trainingsparameter werden an den XGBEstimator während der Klasseninitialisierung über den Parameter „params“ übergeben.

  • Das DataConnector-Objekt kann direkt an die fit-Funktion des Estimator übergeben werden, zusammen mit den Eingabespalten, die die Features definieren, und der Beschriftungsspalte, die das Ziel definiert.

  • Sie können eine Skalierungskonfiguration angeben, wenn Sie die XGBEstimator-Klasse instanziieren. Snowflake verwendet jedoch standardmäßig alle verfügbaren Ressourcen.

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Training parameters
params = {
    'objective': 'reg:squarederror',
    'max_depth': 6,
    'learning_rate': 0.1
}

# Automatic scaling (recommended)
estimator = XGBEstimator(
    params=params
)

# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
    params=params,
    scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)

# Train and evaluate
booster = estimator.fit(
    dataset=train_connector,
    input_cols=['age', 'income', 'credit_score'],
    label_col='default_risk',
    eval_set=eval_connector,
    verbose_eval=10
)

# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
Copy

Evaluierung des Modells

Modelle können durch Übergabe eines eval_set und mit verbose_eval evaluiert werden, um die Evaluierungsdaten auf der Konsole auszugeben. Zusätzlich kann die Ableitung als zweiter Schritt durchgeführt werden. Der verteilte Estimator bietet der Einfachheit halber eine predict-Methode, aber es werden keine verteilten Ableitungen ausgeführt. Wir empfehlen, das Modell nach dem Training in einen OSS xgboost-Estimator umzuwandeln, um Ableitungen auszuführen und in der Modell-Registry zu protokollieren.

Registrieren des Modells

Um das Modell in der Snowflake Modell-Registry zu registrieren, verwenden Sie den Open Source-Booster, der von estimator.get_booster bereitgestellt und von estimator.fit zurückgegeben wurde. Weitere Informationen dazu finden Sie unter XGBoost.

PyTorch

Der SnowflakePyTorch Distributor bietet native Unterstützung für parallele Modelle von verteilten Daten auf dem Snowflake-Backend. UmDDP auf Snowflake zu verwenden, nutzen Sie Open Source PyTorch-Module mit einigen Snowflake-spezifischen Änderungen:

  • Laden Sie Daten mit dem ShardedDataConnector, um Daten automatisch in der Anzahl von Partitionen zu teilen, die mit der world_size des verteilten Trainingslaufs übereinstimmen. Rufen Sie get_shard innerhalb eines Snowflake-Trainingskontexts auf, um den Shard abzurufen, der mit diesem Worker-Prozess verknüpft ist.

  • Verwenden Sie innerhalb der Trainingsfunktion das context-Objekt, um verarbeitungsspezifische Informationen wie Rang, lokaler Rang und die für das Training erforderlichen Daten zu erhalten.

  • Speichern Sie das Modell unter Verwendung des get_model_dir des Kontexts, um den Speicherort für das Modell zu finden. Dadurch wird das Modell lokal für das Training mit einzelnen Knoten gespeichert und mit einem Snowflake-Stagingbereich für verteiltes Training synchronisiert. Wenn kein Speicherort für den Stagingbereich angegeben ist, wird standardmäßig Ihr Benutzer-Stagingbereich verwendet.

Laden von Daten

# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")

data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
Copy

Trainieren des Modells

# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Define a simple neural network
class SimpleNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Define the training function
def train_func():
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from snowflake.ml.modeling.distributors.pytorch import get_context

    # Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
    context = get_context()
    rank = context.get_rank()
    dist.init_process_group(backend='gloo')
    device = torch.device(f"cuda:{context.get_local_rank()}"
                         if torch.cuda.is_available() else "cpu")

    # Initialize model, loss function, and optimizer
    model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
    model = DDP(model)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Retrieve training data
    dataset_map = context.get_dataset_map()
    torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
    dataloader = DataLoader(torch_dataset)

    # Training loop
    for epoch in range(10):
        for batch_dict in dataloader:
            features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
            labels = batch_dict[label_col].T.squeeze(0).float().to(device)
            output = model(features)
            loss = criterion(output, labels.unsqueeze(1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')

    # Save the model to the model directory provided by the context
    if context.get_rank() == 0:
        torch.save(
            model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
        )

# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    # Optional Scaling Configuration, for single node multi-GPU training.
    scaling_config=PyTorchScalingConfig(
        num_nodes=1,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
    )
)

# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
Copy

Abrufen des Modells

Wenn Sie DDP mit mehreren Knoten verwenden, wird das Modell automatisch mit einem Snowflake-Stagingbereich als gemeinsam genutzter persistenter Speicher synchronisiert.

Der folgende Code ruft das Modell aus einem Stagingbereich ab. Er verwendet den artifact_stage_location-Parameter, um den Speicherort des Stagingbereichs anzugeben, in dem das Modellartefakt gespeichert wird.

Die in der stage_location-Variable gespeicherte Funktion erhält den Speicherort des Modells im Stagingbereich, nachdem das Training abgeschlossen ist. Das Modellartefakt wird unter "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}" gespeichert.

response = pytorch_trainer.run(
        dataset_map={'train': data_connector},
        artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
    )

stage_location = response.get_model_dir()
Copy