분산 학습¶
Snowflake Container Runtime은 Snowflake의 인프라에서 모델을 학습시키는 데 사용할 수 있는 유연한 학습 환경을 제공합니다. 다중 노드 및 다중 디바이스 학습을 위해 오픈 소스 패키지를 사용하거나 Snowflake ML 분산형 트레이너를 사용할 수 있습니다.
분산형 트레이너는 여러 노드와 GPUs에 걸쳐 머신 러닝 워크로드를 자동으로 확장합니다. Snowflake 배포자는 복잡한 구성 없이 클러스터 리소스를 지능적으로 관리하므로 분산형 학습을 액세스 가능하고 효율적으로 만들어 줍니다.
다음 작업을 수행할 경우 표준 오픈 소스 라이브러리 사용
단일 노드 환경에서 소규모 데이터 세트로 작업
모델을 사용한 신속한 프로토타이핑 및 실험
분산 요구 사항 없이 워크플로 해제 및 전환
Snowflake 분산형 트레이너를 사용하여 다음을 수행할 수 있습니다.
단일 컴퓨팅 노드의 메모리보다 큰 데이터 세트에서 모델 학습시키기
다중 GPUs를 효율적으로 활용
모든 컴퓨팅 다중 노드 MLJobs 또는 확장된 노트북 클러스터를 자동으로 활용
Snowflake ML 분산형 학습¶
Snowflake ML은 XGBoost, LightGBM 및 PyTorch를 포함하여 널리 사용되는 머신 러닝 프레임워크를 위한 분산형 트레이너를 제공합니다. 이러한 트레이너는 Snowflake의 인프라에서 실행하도록 최적화되어 있으며 여러 노드 및 GPUs 간에 자동으로 확장할 수 있습니다.
자동 리소스 관리 - Snowflake는 사용 가능한 모든 클러스터 리소스를 자동으로 검색하고 사용합니다.
간단한 설정 - Container Runtime 환경은 Snowflake에서 제공하는 Ray 클러스터로 지원되며, 사용자 구성은 필요하지 않습니다.
원활한 Snowflake 통합 - Snowflake 데이터 커넥터 및 스테이지와의 직접적인 호환성
선택적 확장 구성 - 고급 사용자가 필요할 때 미세 조정할 수 있습니다.
데이터 로딩¶
오픈 소스 및 Snowflake 분산형 트레이너 모두에서 뛰어난 성능으로 데이터를 수집하는 방법은 Snowflake 데이터 커넥터를 사용하는 것입니다.
from snowflake.ml.data.data_connector import DataConnector
# Load data
train_connector = DataConnector.from_dataframe(session.table('TRAINING_DATA'))
eval_connector = DataConnector.from_dataframe(session.table('EVAL_DATA'))
학습 방법¶
오픈 소스 학습¶
학습 프로세스를 최대한 유연하게 제어해야 하는 경우 표준 오픈 소스 라이브러리를 사용하세요. 오픈 소스 학습을 통해 Snowflake의 인프라 및 데이터 연결의 이점을 누리면서, 최소한의 수정으로 XGBoost, LightGBM 및 PyTorch와 같은 인기 있는 ML 프레임워크를 직접 사용할 수 있습니다.
다음 예에서는 XGBoost 및 LightGBM을 사용하여 모델을 학습시킵니다.
오픈 소스 XGBoost로 학습시키려면 데이터 커넥터로 데이터를 로드한 후 pandas 데이터 프레임으로 변환하고 XGB 라이브러리를 직접 사용합니다.
import xgboost as xgb
train_df = train_connector.to_pandas()
eval_df = eval_connector.to_pandas()
# Create DMatrix
train_df = train_connector.to_pandas()
dtrain = xgb.DMatrix(train_df[INPUT_COLS], label=train_df[LABEL_COL])
deval = xgb.DMatrix(eval_df)
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Train and evaluate model
evals_result = {}
model = xgb.train(
params,
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train'), (deval, 'valid')],
evals_result=evals_result
)
# Access the evaluation results
print(evals_result)
from snowflake.ml.modeling.distributors.lightgbm import LightGBMEstimator, LightGBMScalingConfig
# Training parameters
params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.9
}
# Automatic scaling (recommended)
estimator = LightGBMEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = LightGBMEstimator(
params=params,
scaling_config=LightGBMScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.feature_importance(importance_type='gain')
분산 학습¶
분산형 XGBEstimator 클래스에는 유사한 API가 있으며 다음과 같은 몇 가지 주요 차이점이 있습니다.
XGBoost 학습 매개 변수는 “params” 매개 변수를 통한 클래스 초기화 중에 :code:`XGBEstimator`에 전달됩니다.
DataConnector 오브젝트는 기능을 정의하는 입력 열과 대상을 정의하는 레이블 열과 함께 추정기의
fit함수에 직접 전달할 수 있습니다.XGBEstimator클래스를 인스턴스화할 때 크기 조정 구성을 제공할 수 있습니다. 그러나 Snowflake는 기본적으로 사용 가능한 모든 리소스를 사용합니다.
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Training parameters
params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1
}
# Automatic scaling (recommended)
estimator = XGBEstimator(
params=params
)
# Call with custom GPU scaling
gpu_estimator = XGBEstimator(
params=params,
scaling_config=XGBScalingConfig(use_gpu=True) # optional - available resources will be used automatically
)
# Train and evaluate
booster = estimator.fit(
dataset=train_connector,
input_cols=['age', 'income', 'credit_score'],
label_col='default_risk',
eval_set=eval_connector,
verbose_eval=10
)
# Access results
booster = estimator.get_booster() # If you forgot to save the output of fit, get the booster from the estimator
feature_importance = booster.get_score(importance_type='gain')
모델 평가¶
모델은 eval_set`를 전달하고 :code:`verbose_eval`을 통해 평가 데이터를 콘솔에 출력하여 평가할 수 있습니다. 또한 추론은 두 번째 단계로 수행할 수 있습니다. 분산형 추정기는 편의상 :code:`predict 메서드를 제공하지만 추론을 분산 방식으로 수행하지는 않습니다. 추론을 수행하고 모델 레지스트리에 로깅하기 위해 학습 후 맞춤 모델을 OSS xgboost 추정기로 변환하는 것이 좋습니다.
모델 등록¶
모델을 Snowflake 모델 레지스트리에 등록하려면 estimator.get_booster`에서 제공하고 :code:`estimator.fit`에서 반환한 오픈 소스 부스터를 사용하세요. 자세한 내용은 :doc:/developer-guide/snowflake-ml/model-registry/built-in-models/xgboost` 섹션을 참조하십시오.
PyTorch¶
Snowflake PyTorch Distributor는 기본적으로 Snowflake 백엔드에서 분산 데이터 병렬 모델을 지원합니다. Snowflake에서 DDP를 사용하려면 Snowflake와 관련된 몇 가지 사항을 수정하고 오픈 소스 PyTorch 모델을 활용하세요.
``ShardedDataConnector``를 통해 데이터를 로드하여 데이터를 분산된 트레이너의 :code:`world_size`와 일치하는 파티션 수로 자동으로 분할합니다. Snowflake 학습 컨텍스트 내에서 :code:`get_shard`를 호출하여 해당 작업자 프로세스와 연결된 shard를 검색합니다.
학습 함수 내에서
context오브젝트를 사용하여 순위, 로컬 순위, 학습에 필요한 데이터와 같은 프로세스별 정보를 가져옵니다.컨텍스트의 :code:`get_model_dir`을 사용하여 모델을 저장할 위치를 찾고 모델을 저장합니다. 그러면 단일 노드 학습을 위해 모델이 로컬에 저장되고, 분산 학습을 위해 모델이 Snowflake 스테이지에 동기화됩니다. 스테이지 위치가 제공되지 않으면 기본적으로 사용자 스테이지가 사용됩니다.
데이터 로드¶
# Create ShardedDataConnector for data ingestion
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
example_snowpark_dataframe = session.table("EXAMPLE_TRAINING_DATA")
data_connector = ShardedDataConnector.from_dataframe(example_snowpark_dataframe)
모델 학습시키기¶
# Import necessary PyTorch libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
# Define a simple neural network
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(SimpleNet, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# Define the training function
def train_func():
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from snowflake.ml.modeling.distributors.pytorch import get_context
# Use the Snowflake context to get the necessary methods to manage and retrieve information about the distributed training environment
context = get_context()
rank = context.get_rank()
dist.init_process_group(backend='gloo')
device = torch.device(f"cuda:{context.get_local_rank()}"
if torch.cuda.is_available() else "cpu")
# Initialize model, loss function, and optimizer
model = SimpleNet(input_size=len(input_cols), hidden_size=32, output_size=1).to(device)
model = DDP(model)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Retrieve training data
dataset_map = context.get_dataset_map()
torch_dataset = dataset_map['train'].get_shard().to_torch_dataset(batch_size=1024)
dataloader = DataLoader(torch_dataset)
# Training loop
for epoch in range(10):
for batch_dict in dataloader:
features = torch.cat([batch_dict[col].T for col in input_cols], dim=1).float().to(device)
labels = batch_dict[label_col].T.squeeze(0).float().to(device)
output = model(features)
loss = criterion(output, labels.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')
# Save the model to the model directory provided by the context
if context.get_rank() == 0:
torch.save(
model.module.state_dict(), os.path.join(context.get_model_dir(), "model.pt")
)
# Set up PyTorchDistributor for distributed training
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
# Optional Scaling Configuration, for single node multi-GPU training.
scaling_config=PyTorchScalingConfig(
num_nodes=1,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=4)
)
)
# Run the training process
pytorch_trainer.run(dataset_map={'train': data_connector})
모델 검색¶
다중 노드 DDP를 사용하는 경우 모델은 공유 영구 저장소로 Snowflake 스테이지에 자동으로 동기화됩니다.
다음 코드는 스테이지에서 모델을 가져옵니다. artifact_stage_location 매개 변수를 사용하여 모델 아티팩트를 저장하는 스테이지의 위치를 지정합니다.
stage_location 변수에 저장된 함수는 학습이 완료된 후 스테이지의 모델 위치를 가져옵니다. 모델 아티팩트는 "DB_NAME.SCHEMA_NAME.STAGE_NAME/model/{request_id}" 아래에 저장됩니다.
response = pytorch_trainer.run(
dataset_map={'train': data_connector},
artifact_stage_location="DB_NAME.SCHEMA_NAME.STAGE_NAME",
)
stage_location = response.get_model_dir()