Entraînement distribué

Le Snowflake Container Runtime fournit un environnement d’entraînement flexible que vous pouvez utiliser pour entraîner des modèles sur l’infrastructure de Snowflake. Vous pouvez utiliser des paquets open source ou les entraîneurs distribués ML Snowflake pour l’entraînement à plusieurs nœuds et à plusieurs appareils.

Les entraîneurs distribués mettent automatiquement à l’échelle vos charges de travail de machine learning sur plusieurs nœuds et GPUs. Les fournisseurs Snowflake gèrent intelligemment les ressources des clusters sans avoir besoin d’une configuration complexe, ce qui rend l’entraînement distribué accessible et efficace.

Utilisez les bibliothèques open source standard lorsque vous

  • Travaillez avec de petits ensembles de données dans des environnements à un seul nœud

  • Réalisez rapidement des prototypes et testez des modèles

  • Transférez des workflows sans exigences distribuées

Utilisez les entraînements distribués Snowflake pour :

  • Entraîner des modèles sur des ensembles de données dont la taille est supérieure à la mémoire d’un seul nœud de calcul

  • Utiliser plusieurs GPUs efficacement

  • Exploiter automatiquement toutes les MLJobs multi-nœuds ou les clusters de notebooks à grande échelle

Entraînement distribué Snowflake ML

Snowflake ML fournit des entraîneurs distribués pour les cadres de machine learning populaires, notamment XGBoost, LightGBM et PyTorch. Ces entraîneurs sont optimisés pour fonctionner sur l’infrastructure de Snowflake et peuvent s’étendre automatiquement à travers plusieurs nœuds et GPUs.

  • Gestion automatique des ressources - Snowflake découvre et utilise automatiquement toutes les ressources de cluster disponibles

  • Configuration simplifiée - L’environnement Container Runtime est soutenu par un cluster Ray fourni par Snowflake, sans configuration utilisateur requise

  • Intégration Snowflake sans jointure - Compatibilité directe avec les connecteurs de données et les zones de préparation Snowflake

  • Configurations de mise à l’échelle facultatives - Les utilisateurs avancés peuvent effectuer un réglage précis lorsque cela est nécessaire

Chargement des données

Pour les entraîneurs open source et distribués Snowflake, le moyen le plus performant d’ingérer des données est le connecteur de données Snowflake :

from snowflake.ml.data.data_connector import DataConnector

# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
Copy

Méthodes d’entraînement

Entraînement open source

Utilisez les bibliothèques open source standard lorsque vous avez besoin d’un maximum de flexibilité et de contrôle sur votre processus d’entraînement. Avec l’entraînement open source, vous utilisez directement les cadres ML populaires comme XGBoost, LightGBM et PyTorch avec des modifications minimes, tout en bénéficiant de l’infrastructure et de la connectivité des données de Snowflake.

Les exemples suivants entraînent un modèle avec XGBoost et LightGBM.

Pour effectuer un entraînement avec XGBoost open source, après avoir chargé les données à l’aide du connecteur de données, convertissez-les en un dataframe pandas et utilisez la bibliothèque XGB directement :

import xgboost as xgb

train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()

# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)

# Training parameters
params = {
   'objective': 'reg:squarederror',
   'max_depth': 6,
   'learning_rate': 0.1
}

# Train and evaluate model
evals_result = {}
model = xgb.train(
   params,
   dtrain,
   num_boost_round=100,
   evals=[(dtrain, 'train'), (deval, 'valid')],
   evals_result=evals_result
)

# Access the evaluation results
print(evals_result)
Copy

Entraînement distribué

La classe XGBEstimator distribuée est dotée d’une API similaire avec quelques différences clés :

  • Les paramètres d’entraînement XGBoost sont transmis à XGBEstimator pendant l’initialisation de la classe par le paramètre « params ».

  • L’objet DataConnector peut être transmis directement dans la fonction fit de l’estimateur, ainsi que les colonnes d’entrée définissant les fonctionnalités et la colonne de balise définissant la cible.

  • Vous pouvez fournir une configuration de mise à l’échelle lors de l’instanciation de la classe XGBEstimator. Cependant, Snowflake utilise par défaut toutes les ressources disponibles.

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Training parameters
params = {
    'objective': 'reg:squarederror',
    'max_depth': 6,
    'learning_rate': 0.1
}

# Automatic scaling (recommended)
estimator = XGBEstimator(
    params=params
)

# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
    params=params,
    scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)

# Train and evaluate
booster = estimator.fit(
    dataset=train_connector,
    input_cols=['age', 'income', 'credit_score'],
    label_col='default_risk',
    eval_set=eval_connector,
    verbose_eval=10
)

# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
Copy

Évaluation du modèle

Les modèles peuvent être évalués en transmettant eval_set et en utilisant verbose_eval pour imprimer les données d’évaluation dans la console. En outre, l’inférence peut être effectuée en tant que deuxième étape. L’estimateur distribué offre une méthode predict pratique, mais il n’effectuera pas d’inférence de manière distribuée. Nous recommandons de convertir le modèle d’ajustement en estimateur xgboost OSS après l’entraînement afin d’effectuer l’inférence et de se connecter au registre des modèles.

Enregistrement du modèle

Pour enregistrer le modèle dans le registre des modèles de Snowflake, utilisez le booster open source fourni par estimator.get_booster et renvoyé par estimator.fit. Pour plus d’informations, voir XGBoost.

PyTorch

Le distributeur PyTorch Snowflake prend en charge de manière native les modèles Distributed Data Parallel sur le backend Snowflake. Pour utiliser DDP sur Snowflake, exploitez les modules PyTorch open source avec quelques modifications spécifiques à Snowflake :

  • Charger des données avec le ShardedDataConnector pour fragmenter automatiquement les données dans le nombre de partitions qui correspond à la world_size du système d’entraînement distribué. Appeler get_shard dans un contexte d’entraînement Snowflake pour récupérer le fragment associé à ce processus de travail.

  • Dans la fonction d’entraînement, utilisez l’objet context permettant d’obtenir des informations spécifiques au processus telles que le rang, le rang local et les données requises pour l’entraînement.

  • Enregistrer le modèle en utilisant get_model_dir du contexte pour trouver l’emplacement où stocker le modèle. Le modèle sera stocké localement pour un entraînement à un seul nœud, et synchronisera le modèle avec une zone de préparation Snowflake pour un entraînement distribué. Si aucun emplacement n’est indiqué, votre zone de préparation utilisateur sera utilisée par défaut.

Charger les données

# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")

data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
Copy

Modèle d’entraînement

# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Define a simple neural network
class SimpleNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleNet, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Define the training function
def train_func():
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from snowflake.ml.modeling.distributors.pytorch import get_context

    # Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
    context = get_context()
    rank = context.get_rank()
    dist.init_process_group(backend='gloo')
    device = torch.device(f"cuda:{context.get_local_rank()}"
                         if torch.cuda.is_available() else "cpu")

    # Initialize model, loss function, and optimizer
    model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
    model = DDP(model)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Retrieve training data
    dataset_map = context.get_dataset_map()
    torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
    dataloader = DataLoader(torch_dataset)

    # Training loop
    for epoch in range(10):
        for batch_dict in dataloader:
            features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
            labels = batch_dict[label_col].T.squeeze(0).float().to(device)
            output = model(features)
            loss = criterion(output, labels.unsqueeze(1))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')

    # Save the model to the model directory provided by the context
    if context.get_rank() == 0:
        torch.save(
            model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
        )

# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    # Optional Scaling Configuration, for single node multi-GPU training.
    scaling_config=PyTorchScalingConfig(
        num_nodes=1,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
    )
)

# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
Copy

Récupération du modèle

Si vous utilisez un DDP à nœuds multiples, le modèle est automatiquement synchronisé avec une zone de préparation Snowflake comme stockage persistant partagé.

Le code suivant extrait le modèle d’une zone de préparation. Il utilise le paramètre artifact_stage_location pour spécifier l’emplacement de la zone de préparation qui stocke l’artefact du modèle.

La fonction enregistrée dans la variable stage_location obtient l’emplacement du modèle dans la zone de préparation une fois l’entraînement terminé. L’artefact du modèle est enregistré sous "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}".

response = pytorch_trainer.run(
        dataset_map={'train': data_connector},
        artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
    )

stage_location = response.get_model_dir()
Copy