Treinamento distribuído¶
O Snowflake Container Runtime fornece um ambiente de treinamento flexível que você pode usar para treinar modelos na infraestrutura do Snowflake. Você pode usar pacotes de código aberto ou Snowflake ML Instrutores distribuídos para treinamento de vários nós e dispositivos.
Os treinamentos distribuídos dimensionam automaticamente suas cargas de trabalho de aprendizado de máquina em vários nós e GPUs. Os distribuídos em Snowflake gerenciam de forma inteligente os recursos do cluster sem exigir configurações complexas, tornando o treinamento distribuído acessível e eficiente.
Use bibliotecas de código aberto padrão quando você
Como trabalhar com pequenos conjuntos de dados em ambientes de nó único
Prototipagem e experimentação de modelos rapidamente
Fluxos de trabalho de criar e usar sem requisitos distribuídos
Use os treinamentos distribuídos do Snowflake para:
Treinar modelos em conjuntos de dados maiores que a memória de um único nó de computação
Uso de várias GPUs de forma eficiente
Aproveitar automaticamente todas as de computação de vários nós MLJobs ou clusters de notebook escalonados
Treinamento distribuído do Snowflake ML¶
O Snowflake ML fornece formadores distribuídos para estruturas populares de aprendizado de máquina, incluindo XGBoost, LightGBM e PyTorch. Esses treinamentos são otimizados para execução na infraestrutura do Snowflake e podem ser dimensionados automaticamente em vários nós e GPUs.
Gerenciamento automático de recursos - O Snowflake descobre e usa automaticamente todos os recursos de cluster disponíveis
Configuração simplificada - O ambiente Container Runtime é apoiado por um cluster Ray fornecido pelo Snowflake, sem necessidade de configuração de usuário
Integração perfeita do Snowflake - Compatibilidade direta com conectores de dados e estágios do Snowflake
Configurações de escalonamento opcionais - Usuários avançados podem ajustar quando necessário
Carregamento de dados¶
Para ambos os treinamentos de código aberto e distribuídos Snowflake, a maneira mais eficiente de ingerir dados é com o Snowflake Data Connector:
from snowflake.ml.data.data_connector import DataConnector
# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
Métodos de treinamento¶
Treinamento de código aberto¶
Use bibliotecas padrão de código aberto quando precisar de máxima flexibilidade e controle sobre seu processo de treinamento. Com o treinamento de código aberto, você usa diretamente os populares ML Estruturas como XGBoost, LightGBMe PyTorch com modificações mínimas, enquanto ainda se beneficia da infraestrutura e da conectividade de dados do Snowflake.
Os exemplos a seguir treinam um modelo com XGBoost e LightGBM.
Para treinar com código aberto XGBoost, após carregar dados com o conector de dados, converta-o em um pandas DataFrame e use o XGB da biblioteca diretamente:
import xgboost as xgb
train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()
# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Train and evaluate model
evals_result = {}
model = xgb.train(
params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train'), (deval, 'valid')],
evals_result=evals_result
)
# Access the evaluation results
print(evals_result)
from snowflake.ml.modeling.distributors.lightgbm import LightGBMEstimator, LightGBMScalingConfig
# Training parameters
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9
}
# Automatic scaling (recommended)
estimator = LightGBMEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = LightGBMEstimator(
params=params,
scaling_config=LightGBMScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.feature_importance(importance_type='gain')
Treinamento distribuído¶
O distribuído XGBEstimator A classe tem uma semelhante API com algumas diferenças importantes:
Os parâmetros de treinamento XGBoost são passados para
XGBEstimatordurante a inicialização da classe através do parâmetro «params».O objeto DataConnector pode ser passado diretamente para a função
fitdo estimador, juntamente com as colunas de entrada que definem os recursos e a coluna de rótulo que define o destino.Você pode fornecer uma configuração de escala ao instanciar a classe
XGBEstimator. No entanto, o Snowflake usa como padrão todos os recursos disponíveis.
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Automatic scaling (recommended)
estimator = XGBEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
params=params,
scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
Avaliação do modelo¶
Os modelos podem ser avaliados passando um eval_set e usando verbose_eval para imprimir os dados de avaliação no console. Além disso, a inferência pode ser feita como uma segunda etapa. O estimador distribuído oferece um predict por conveniência, mas não fará inferência de forma distribuída. Recomendamos converter o modelo ajustado em um estimador xgboost OSS após o treinamento para fazer inferência e registrar no registro do modelo.
Registro do modelo¶
Para registrar o modelo no registro do modelo Snowflake, use o otimizador de código aberto fornecido por estimator.get_booster e retornavam de estimator.fit. Para obter mais informações, consulte XGBoost.
PyTorch¶
O Snowflake PyTorch Distributor oferece suporte nativamente a modelos de dados paralelos distribuídos no backend Snowflake. Para usar DDP no Snowflake, aproveite os módulos PyTorch de código aberto com algumas modificações específicas do Snowflake:
Carregue dados usando o
ShardedDataConnectorpara fragmentar automaticamente os dados no número de partições que corresponde aoworld_sizedo treinamento distribuído. Chameget_sharddentro de um contexto de treinamento do Snowflake para recuperar o s4 associado a esse processo de trabalho.Dentro da função de treinamento, use o objeto
contextpara obter informações específicas de processo, como classificação, classificação local e os dados necessários para treinamento.Salve o modelo usando o do contexto de
get_model_dirpara encontrar o local para armazenar o modelo. Isso armazenará o modelo localmente para treinamento de nó único e sincronizará o modelo com um estágio Snowflake para treinamento distribuído. Se nenhum local do estágio for fornecido, seu estágio de usuário será usado por padrão.
Carregar dados¶
# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")
data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
Modelo de treinamento¶
# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
# Define a simple neural network
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Define the training function
def train_func():
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from snowflake.ml.modeling.distributors.pytorch import get_context
# Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
context = get_context()
rank = context.get_rank()
dist.init_process_group(backend='gloo')
device = torch.device(f"cuda:{context.get_local_rank()}"
if torch.cuda.is_available() else "cpu")
# Initialize model, loss function, and optimizer
model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
model = DDP(model)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Retrieve training data
dataset_map = context.get_dataset_map()
torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
dataloader = DataLoader(torch_dataset)
# Training loop
for epoch in range(10):
for batch_dict in dataloader:
features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
labels = batch_dict[label_col].T.squeeze(0).float().to(device)
output = model(features)
loss = criterion(output, labels.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')
# Save the model to the model directory provided by the context
if context.get_rank() == 0:
torch.save(
model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
)
# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
# Optional Scaling Configuration, for single node multi-GPU training.
scaling_config=PyTorchScalingConfig(
num_nodes=1,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
)
)
# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
Recuperação do modelo¶
Se você estiver usando uma de vários nós DDP, o modelo é sincronizado automaticamente com um estágio do Snowflake como o armazenamento persistente compartilhado.
O código a seguir obtém o modelo de um estágio. Ele usa o parâmetro artifact_stage_location para especificar o local do estágio que armazena o artefato do modelo.
A função salva na variável stage_location obtém a localização do modelo no estágio após a conclusão do treinamento. O artefato do modelo é salvo em "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}".
response = pytorch_trainer.run(
dataset_map={'train': data_connector},
artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
)
stage_location = response.get_model_dir()