分散トレーニング¶
Snowflake Container Runtimeは、Snowflakeのインフラストラクチャ上でモデルをトレーニングするために使用できる柔軟なトレーニング環境を提供します。オープンソースパッケージを使用することもできますし、Snowflake ML 分散トレーナーを使用して、マルチノードおよびマルチデバイスのトレーニングを行うこともできます。
分散トレーナーは、機械学習のワークロードを複数のノードと GPUs に自動的にスケーリングします。Snowflakeディストリビューターは、複雑な構成を必要とせずにクラスターリソースをインテリジェントに管理し、分散トレーニングを利用しやすく効率的にします。
以下を行う場合に標準的なオープンソースライブラリを使用する
シングルノード環境で小さなデータセットを扱う
モデルの迅速な試作と実験を行う
分散要件なしでワークフローをリフトアンドシフトする
Snowflake分散トレーナーを使用する目的
1つのコンピュートノードのメモリよりも大きなデータセットでモデルをトレーニングする
複数の GPUs を効率的に活用する
すべてのコンピュートマルチノード MLJobs またはスケーリングされたノートブッククラスターを自動的に活用する
Snowflake ML 分散トレーニング¶
Snowflake ML は、 XGBoost 、 LightGBM 、 PyTorch を含む一般的な機械学習フレームワーク用の分散トレーナーを提供します。これらのトレーナーは、Snowflakeのインフラ上で動作するように最適化されており、複数のノードと GPUs にわたって自動的にスケーリングできます。
自動リソース管理 - Snowflakeは、利用可能なすべてのクラスターリソースを自動的に検出して使用します
簡素化されたセットアップ - Container Runtime環境は、Snowflakeが提供するRayクラスターによってサポートされており、ユーザーによる構成は不要です
シームレスなSnowflake統合 - Snowflakeデータコネクタおよびステージと直接互換性があります
オプションのスケーリング構成 - 上級ユーザーは必要に応じて微調整が可能です
データのロード¶
オープンソースとSnowflake分散トレーナーの両方で、データを取り込む最もパフォーマンスの高い方法は、Snowflake Data Connectorを使用することです。
from snowflake.ml.data.data_connector import DataConnector
# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
トレーニング方法¶
オープンソーストレーニング¶
最大限の柔軟性とトレーニングプロセスの制御が必要な場合は、標準的なオープンソースライブラリを使用します。オープンソーストレーニングでは、Snowflakeのインフラとデータ接続の恩恵を受けながら、 XGBoost 、 LightGBM 、 PyTorch のような一般的な ML フレームワークを最小限の変更で直接使用できます。
以下の例では、 XGBoost と LightGBM を用いてモデルをトレーニングしています。
オープンソースの XGBoost でトレーニングするには、データコネクタでデータをロードした後、それをpandasデータフレームに変換し、 XGB ライブラリを直接使用します。
import xgboost as xgb
train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()
# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Train and evaluate model
evals_result = {}
model = xgb.train(
params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train'), (deval, 'valid')],
evals_result=evals_result
)
# Access the evaluation results
print(evals_result)
from snowflake.ml.modeling.distributors.lightgbm import LightGBMEstimator, LightGBMScalingConfig
# Training parameters
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9
}
# Automatic scaling (recommended)
estimator = LightGBMEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = LightGBMEstimator(
params=params,
scaling_config=LightGBMScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.feature_importance(importance_type='gain')
分散トレーニング¶
分散 XGBEstimator クラスには同様の API がありますが、いくつかの重要な違いがあります。
XGBoost トレーニングパラメーターは、「params」パラメーターを通して、クラス初期化時に
XGBEstimatorに渡されます。DataConnector オブジェクトは、機能を定義する入力列とターゲットを定義するラベル列とともに、推定器の
fit関数に直接渡すことができます。XGBEstimatorクラスをインスタンス化する際に、スケーリング構成を指定できます。しかし、Snowflakeはデフォルトで利用可能なすべてのリソースを使用します。
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Automatic scaling (recommended)
estimator = XGBEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
params=params,
scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
モデルの評価¶
モデルの評価は、 eval_set を渡し、 verbose_eval を使って評価データをコンソールに出力することで行うことができます。さらに、第二段階として推論を行うこともできます。分散推定器は、利便性のために predict メソッドを提供しますが、分散形式で推論を行うことはありません。推論を行い、モデルレジストリにログを記録するために、フィットモデルをトレーニング後に OSS xgboost推定器に変換することを推奨します。
モデルの登録¶
モデルをSnowflakeモデルレジストリに登録するには、 estimator.get_booster によって提供され、 estimator.fit から返されるオープンソースブースターを使用します。詳細については、 XGBoost をご参照ください。
PyTorch¶
Snowflake PyTorch ディストリビューターは、Snowflakeバックエンドの分散データ並列モデルをネイティブにサポートしています。Snowflakeで DDP を使用するには、オープンソースの PyTorch モジュールを活用し、Snowflake固有の修正をいくつか加えます。
ShardedDataConnectorを使用してデータをロードし、分散トレーナーのworld_sizeに一致する数のパーティションにデータを自動的にシャードします。ワーカープロセスに関連付けられたシャードを取得するために、Snowflakeトレーニングコンテキスト内でget_shardを呼び出します。トレーニング関数の内部では、
contextオブジェクトを使用して、ランク、ローカルランク、トレーニングに必要なデータなど、プロセス固有の情報を取得します。コンテキストの
get_model_dirを使用してモデルを保存し、モデルの保管場所を見つけます。これにより、単一ノードのトレーニングではモデルをローカルに保管し、分散トレーニングではモデルをSnowflakeステージに同期します。ステージの場所が指定されていない場合、デフォルトでユーザーステージが使用されます。
データをロード¶
# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")
data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
モデルのトレーニング¶
# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
# Define a simple neural network
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Define the training function
def train_func():
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from snowflake.ml.modeling.distributors.pytorch import get_context
# Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
context = get_context()
rank = context.get_rank()
dist.init_process_group(backend='gloo')
device = torch.device(f"cuda:{context.get_local_rank()}"
if torch.cuda.is_available() else "cpu")
# Initialize model, loss function, and optimizer
model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
model = DDP(model)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Retrieve training data
dataset_map = context.get_dataset_map()
torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
dataloader = DataLoader(torch_dataset)
# Training loop
for epoch in range(10):
for batch_dict in dataloader:
features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
labels = batch_dict[label_col].T.squeeze(0).float().to(device)
output = model(features)
loss = criterion(output, labels.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')
# Save the model to the model directory provided by the context
if context.get_rank() == 0:
torch.save(
model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
)
# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
# Optional Scaling Configuration, for single node multi-GPU training.
scaling_config=PyTorchScalingConfig(
num_nodes=1,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
)
)
# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
モデルの取得¶
マルチノード DDP を使用している場合、モデルは自動的に共有永続ストレージとしてSnowflakeステージに同期されます。
次のコードはステージからモデルを取得します。artifact_stage_location パラメーターを使用して、モデルの成果物を保管するステージの場所を指定します。
stage_location 変数に保存された関数は、トレーニング完了後のステージにおけるモデルの場所を取得します。モデルの成果物は "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}" の下に保存されます。
response = pytorch_trainer.run(
dataset_map={'train': data_connector},
artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
)
stage_location = response.get_model_dir()