멀티 노드 클러스터의 Container Runtime for ML¶
이 미리 보기에서는 Container Runtime for ML 을 사용하여 Snowflake Notebooks의 멀티 노드 클러스터에서 ML 워크로드를 실행할 수 있습니다. snowflake-ml-python
라이브러리에는 ML 워크로드에 사용할 수 있는 컴퓨팅 풀의 노드 수를 설정하는 API가 포함되어 있어, 컴퓨팅 풀의 크기를 조정하지 않고도 워크로드에 사용할 수 있는 리소스를 확장할 수 있습니다. 또 다른 API 는 활성 노드 목록을 검색합니다.
멀티 노드 클러스터는 한 노드를 헤드 노드로 지정합니다. 추가 노드는 워커 노드라고 합니다. 헤드 노드는 클러스터에서 병렬 작업을 오케스트레이션하고 워크로드 실행에 컴퓨팅 리소스를 제공하기도 합니다. 활성 노드가 하나인 멀티 노드 클러스터에는 헤드 노드만 있습니다. 활성 노드가 3개인 멀티 노드 클러스터에는 1개의 헤드 노드와 2개의 워커 노드가 있으며, 3개의 노드 모두 워크로드 실행에 참여합니다.
전제 조건¶
멀티 노드 클러스터를 사용하여 ML 워크로드를 실행하려면 다음이 필요합니다.
노트북에 액세스할 수 있는 활성 Snowflake 계정입니다. Snowflake Notebooks 섹션을 참조하십시오.
Container Runtime을 사용하는 노트북을 만들고 관리할 수 있는 권한입니다. Container Runtime for ML 의 Notebooks 섹션을 참조하십시오.
컴퓨팅 풀 구성하기¶
멀티 노드 설정을 사용하려면 최소 두 개의 노드가 있는 컴퓨팅 풀이 필요합니다. 새 컴퓨팅 풀을 생성 하거나 기존 풀을 변경 할 수 있습니다. 두 명령 중 하나에서 MAX_NODES 인자를 전달하여 풀의 최대 용량을 설정합니다. 워크로드의 규모에 따라 쉽게 확장 또는 축소할 수 있도록 하나 이상의 추가 노드를 프로비저닝하는 것이 좋습니다.
컴퓨팅 풀의 용량을 확인하려면 DESCRIBE COMPUTE POOL 명령을 사용하십시오. 용량은 반환된 테이블의 MAX_NODES 열에 있습니다.
DESCRIBE COMPUTE POOL my_pool;
컴퓨팅 풀의 용량을 설정하려면 ALTER COMPUTE POOL 명령을 사용하십시오.
ALTER COMPUTE POOL <compute_pool_name>
SET MAX_NODES = <total_capacity>;
멀티 노드 클러스터에서 워크로드 실행하기¶
노트북에 대해 멀티 노드 컴퓨팅 풀을 선택하는 것은 컴퓨팅 풀의 여러 노드를 사용하여 ML 워크로드를 실행하는 데 필요한 유일한 작업입니다.
노트북에서 snowflake.ml.runtime_cluster.scale_cluster
Python API 를 사용하여 활성 노드 수를 설정합니다. 컴퓨팅 풀의 활성 노드 수는 풀의 MAX_NODES 까지 워크로드를 실행할 수 있는 가용성 노드 수입니다. 이 메서드는 헤드 노드와 모든 워커 노드를 포함하여 요구되는 총 활성 노드 수를 기본 매개 변수로 사용합니다.
참고
이 함수는 기본적으로 차단되어 있으며(즉, 스케일링 작업이 완료될 때까지 기다림) 12분의 시간 제한이 있습니다. 작업 시간이 초과되면 자동으로 이전 상태로 롤백됩니다.
스케일링 작업은 세션 간에 지속되지 않습니다. 즉, 노트북이 0이 아닌 수의 워커 노드로 끝나는 경우, 다음에 노트북을 시작할 때 자동으로 확장되지 않습니다. 스케일링 API 를 다시 호출하여 워커 노드 수를 설정해야 합니다.
구문¶
snowflake.ml.runtime_cluster.scale_cluster(
expected_cluster_size: int,
*,
notebook_name: Optional[str] = None,
is_async: bool = False,
options: Optional[Dict[str, Any]] = None
) -> bool
인자¶
expected_cluster_size
(정수): 컴퓨팅 풀의 활성 노드 수로, 최대 풀의 MAX_NODES 까지입니다. 여기에는 헤드 노드와 모든 워커 노드가 포함됩니다.notebook_name
(선택적 [str]): 워크로드가 실행되는 노트북의 이름입니다. 확장할 컴퓨팅 풀은 지정된 노트북이 실행 중인 풀입니다. 제공되지 않으면 현재 컨텍스트에서 자동으로 결정됩니다. 잘못된 노트북 이름을 사용하면 예외가 발생합니다.is_async
(부울): 함수 블록이 스케일링을 대기할지 여부를 제어합니다.False(기본값)인 경우: 이 함수는 클러스터가 완전히 준비되거나 작업이 시간 초과될 때까지 차단합니다.
True인 경우: 스케일링 요청이 수락되었는지 확인한 후 함수가 즉시 반환됩니다.
options
(선택적 [Dict [str, Any]]): 고급 구성 옵션:rollback_after_seconds
(정수): 스케일링이 완료되지 않은 경우 자동 롤백까지의 최대 시간입니다. 기본값은 720초입니다.block_until_min_cluster_size
(정수): 함수가 반환되기 전에 준비해야 하는 최소 노드 수입니다.
반환¶
컴퓨팅 풀이 지정된 수의 활성 노드로 성공적으로 확장된 경우 True
. 그렇지 않으면 예외가 발생합니다.
예¶
from snowflake.ml.runtime_cluster import scale_cluster
# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)
# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers
# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)
# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
사용 가능한 노드 수 가져오기¶
get_nodes
API 를 사용하여 클러스터의 활성 노드에 대한 정보를 가져옵니다. 이 함수에는 인자가 필요하지 않습니다.
구문¶
get_nodes() -> list
반환¶
클러스터의 활성 노드에 대한 세부 정보가 포함된 목록입니다. 목록의 각 요소는 다음 키가 있는 사전입니다.
name
(문자열): 노드의 이름입니다.cpus
(정수): 노드에 있는 CPUs 의 수입니다.gpus
(정수): 노드에 있는 GPUs 의 수입니다.
예¶
from snowflake.ml.runtime_cluster import get_nodes
# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
예제 코드의 출력은 다음과 같습니다.
2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]
멀티 노드 클러스터에 대한 분산 학습¶
Container Runtime for ML 은 LightGBM, XGBoost 및 PyTorch 모델의 분산 학습을 지원합니다. LightGBMEstimator, XGBEstimator 및 PyTorch 의 분산 학습 API는 API 참조 에 자세히 설명되어 있습니다.
스케일링 구성¶
모든 모델은 학습 작업에 대한 리소스를 지정할 수 있는 선택적 확장 구성 매개 변수를 제공합니다. 스케일링 구성은 모델 유형에 따라 다른 모델별 클래스의 인스턴스 LightGBMScalingConfig
, XGBScalingConfig
또는 PyTorchScalingConfig
입니다.
LightGBM 및 XGBoost 스케일링 구성 오브젝트에는 다음과 같은 특성이 있습니다.
num_workers
: 학습에 사용할 워커 프로세스 수입니다. 기본값은 -1로, 워커 프로세스 수가 자동으로 설정됩니다.num_cpu_per_worker
: 워커 프로세스당 할당된 CPU 개수입니다. 기본값은 -1로, 워커 프로세스당 CPU 수가 자동으로 설정됩니다.use_gpu
: 학습에 GPU 를 사용할지 여부입니다. 기본값은 없음으로, 예측 도구가 환경에 따라 선택할 수 있습니다. GPU 를 사용하는 경우 GPU 를 사용하도록 모델 매개 변수도 구성해야 합니다.
참고
일반적으로 num_workers
및 num_cpu_per_worker
를 기본값으로 두면 Container Services for ML 이 이러한 리소스를 배포하는 가장 좋은 방법을 결정합니다. 런타임은 컴퓨팅 풀의 각 노드에 대해 워커를 할당하고, 각 워커가 작업을 완료하는 데 필요한 CPU 또는 GPU를 할당합니다.
PyTorch 스케일링 구성 오브젝트에는 다음과 같은 특성이 있습니다.
num_cpus
: 각 워커에 대해 예약할 CPU 코어 수입니다.num_gpus
: 각 워커에 대해 예약할 GPU 수입니다. 기본값은 0으로, GPU 예약이 없음을 나타냅니다.
LightGBM/XGBoost 모델의 분산 학습¶
- 메모리 사용량
일반적으로 RAM 의 n GB 를 가진 노드는 메모리 부족 없이 n/4 ~ n/3 의 데이터로 모델을 학습시킬 수 있습니다. 최대 데이터 세트 크기는 워커 프로세스 수와 사용된 학습 알고리즘에 따라 달라집니다.
- 컴퓨팅 성능
멀티 노드 학습의 성능은 트리 깊이, 트리 수, 최대 빈 수와 같은 모델 매개 변수에 따라 달라집니다. 이러한 매개 변수 값을 늘리면 데이터 세트의 총 학습 시간이 늘어날 수 있습니다.
예¶
다음 예는 멀티 노드 클러스터에서 XGBoost 모델을 학습시키는 방법을 보여줍니다. LightGBM 모델의 학습도 비슷합니다.
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"
# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
sql_script += f"select \n"
for i in range(1, 10):
sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
sql_script += f"FT_1 + FT_2 AS TARGET, \n"
sql_script += f"from TABLE(generator(rowcount=>({10000})));"
return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""
sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)
params = {
"eta": 0.1,
"max_depth": 8,
"min_child_weight": 100,
"tree_method": "hist",
}
scaling_config = XGBScalingConfig(
use_gpu=False
)
estimator = XGBEstimator(
n_estimators=50,
objective="reg:squarederror",
params=params,
scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
sample_train_df, ingestor_class=RayDataIngester
)
xgb_model = estimator.fit(
data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
PyTorch 모델의 분산 학습¶
PyTorch 모델은 각 워커 프로세스에서 호출되는 학습 함수(train_func
)를 사용하여 학습됩니다.
컨텍스트 API 사용하기¶
학습 함수를 실행하는 동안 컨텍스트 API를 사용하여 학습 환경에 대한 필수 메타데이터에 액세스하고 호출자에서 학습 함수로 매개 변수를 전달할 수 있습니다. PyTorch 컨텍스트 클래스에 대한 설명서는 관련 클래스 섹션을 참조하십시오.
컨텍스트 오브젝트는 학습 함수의 동작을 사용자 지정하는 데 사용할 수 있는 런타임 메타데이터를 노출합니다. 제공된 메서드 get_node_rank
, get_local_rank
, get_world_size
등을 사용하여 검색할 수 있습니다.
다음 코드는 컨텍스트 오브젝트에서 test
및 train
값을 검색하는 예제로, dataset_map
이라는 키로 전달됩니다(이 주제의 뒷부분에 있는 학습 함수 예제에서 확인할 수 있음). 이러한 값은 PyTorch 데이터 세트 오브젝트를 생성하는 데 사용되며, 이 데이터 세트는 모델에 전달됩니다.
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
메트릭 보고¶
컨텍스트 오브젝트의
metrics_reporter
메서드를 사용하여 학습 함수에서 제어 코드로 메트릭을 전송합니다. 이를 통해 다음 예시와 같이 학습 프로세스를 실시간으로 모니터링하고 디버깅할 수 있습니다.context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
예¶
다음 예는 PyTorch 모델에 대한 학습 함수입니다.
def train_func():
import io
import base64
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import transforms
from torch.utils.data import IterableDataset
from torch.optim.lr_scheduler import StepLR
from PIL import Image
from snowflake.ml.modeling.distributors.pytorch import get_context
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
class DecodedDataset(IterableDataset):
def __init__(self, source_dataset):
self.source_dataset = source_dataset
self.transforms = transforms.ToTensor() # Ensure we apply ToTensor transform
def __iter__(self):
for row in self.source_dataset:
base64_image = row['IMAGE']
image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
# Convert the image to a tensor
image = self.transforms(image) # Converts PIL image to tensor
labels = row['LABEL']
yield image, int(labels)
def train(model, device, train_loader, optimizer, epoch):
model.train()
batch_idx = 1
for data, target in train_loader:
# print(f"data : {data} \n target: {target}")
# raise RuntimeError("test")
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
batch_idx += 1
context = get_context()
rank = context.get_local_rank()
device = f"cuda:{rank}"
is_distributed = context.get_world_size() > 1
if is_distributed:
dist.init_process_group(backend="nccl")
print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
batch_size = 64
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
model = Net().to(device)
if is_distributed:
model = DDP(model)
optimizer = optim.Adadelta(model.parameters())
scheduler = StepLR(optimizer, step_size=1)
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
start_time = time.time()
for epoch in range(num_epochs):
train(model, device, train_loader, optimizer, epoch+1)
scheduler.step()
now = time.time()
context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
test(model, device, test_loader, context)
다음 코드는 앞의 학습 함수가 주어졌을 때 분산 학습을 시작하는 방법을 설명합니다. 이 예제에서는 여러 노드에서 학습을 실행하기 위해 PyTorch 배포자 오브젝트를 생성하고, 컨텍스트 오브젝트를 통해 학습 및 테스트 데이터를 학습 함수에 연결하고, 트레이너를 실행하기 전에 스케일링 구성을 설정합니다.
# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector
df = session.table("MNIST_60K")
train_df, test_df = df.random_split([0.99, 0.01], 0)
# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
scaling_config=PyTorchScalingConfig( # scaling configuration
num_nodes=2,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
)
)
# Run the trainer.
results = pytorch_trainer.run( # accepts context values as parameters
dataset_map={"train": train_data, "test": test_data},
hyper_params={"num_epochs": "1"}
)
알려진 제한 사항 및 일반적인 문제¶
이러한 제한 사항과 문제는 ML 에 대한 Container Runtime의 멀티 노드 학습이 일반적으로 제공되기 전에 해결될 것으로 보입니다.
스케일링 작업 시간 제한¶
12분 시간 제한 내에 새 노드가 준비되지 않아 확장 작업이 실패할 수 있습니다. 가능한 원인은 다음과 같습니다.
풀 용량 부족. 풀의 MAX_NODES 보다 더 많은 노드를 요청했습니다. 풀의 MAX_NODES 를 늘리십시오.
리소스 경합. 12분은 추가된 노드를 워밍업하기에 충분한 시간이 아닐 수 있습니다. 풀의 MIN_NODES 를 더 큰 숫자로 설정하여 일부 노드를 워밍업된 상태로 유지하거나
scale_cluster
를 두 번 이상 호출하여 활성 노드 수를 더 작은 단위로 늘리십시오. 또 다른 옵션은 비동기 모드를 사용하여 모든 노드가 준비될 때까지 기다리는 것을 건너뛰는 것입니다.비차단 작업에는 비동기 모드를 사용합니다.
scale_cluster(3, is_async=True)
시간 제한 임계값을 늘립니다.
scale_cluster(3, options={"rollback_after_seconds": 1200})
노트북 이름 오류¶
“Notebook <name> does not exist or not authorized”와 같은 오류 메시지가 표시된다면 자동으로 감지된 노트북 이름이 현재 노트북과 일치하지 않는다는 뜻입니다. 다음과 같은 경우에 이런 일이 발생할 수 있습니다.
노트북 이름에 점과 공백과 같은 특수 문자가 포함된 경우
노트북 이름 자동 감지가 제대로 작동하지 않는 경우
해결 방법: 노트북 이름 매개 변수를 명시적으로 제공하십시오. 노트북 이름에 큰따옴표가 있어야 식별자 로 취급됩니다.
# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
scale_cluster(2)
except Exception as e:
print(e) # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
SPCS 스케일링 작업 실패 후 서비스가 정리되지 않음¶
스케일링 작업이 실패하면 시스템은 작업에서 생성된 모든 리소스를 정리해야 합니다. 그러나 실패할 경우 하나 이상의 SPCS 서비스가 PENDING 또는 FAILED 상태로 남아있을 수 있습니다. PENDING 상태의 서비스는 나중에 ACTIVE 상태가 되거나 컴퓨팅 풀에 용량이 없는 경우 영원히 PENDING 상태로 유지될 수 있습니다.
PENDING 또는 FAILED 상태의 서비스를 제거하려면 클러스터를 하나의 노드(워커 노드 0개)로 확장합니다. 실행된 모든 서비스를 정리하려면 노트북 인터페이스에서 “세션 종료”를 클릭해 현재 노트북 세션을 종료하십시오.