Container Runtime para ML em clusters de vários nós

Nessa versão preliminar, o Container Runtime para ML permite que você execute cargas de trabalho de ML em clusters de vários nós no Snowflake Notebooks. A biblioteca snowflake-ml-python inclui APIs para definir o número de nós no pool de computação disponível para cargas de trabalho de ML, permitindo que os recursos disponíveis para uma carga de trabalho sejam dimensionados sem redimensionar o pool de computação. Outra API obtém uma lista de nós ativos.

Um cluster com vários nós atribui um nó para ser o nó principal. Os nós adicionais são chamados de nós de trabalho. O nó principal orquestra operações paralelas no cluster e também contribui com seus recursos de computação para executar a carga de trabalho. Um cluster de vários nós com um nó ativo tem apenas um nó principal. Um cluster de vários nós com três nós ativos tem um nó principal e dois nós de trabalho, e todos os três nós participam da execução da sua carga de trabalho.

Pré-requisitos

Para usar clusters de vários nós para executar as cargas de trabalho de ML, é necessário:

Configurar um pool de computação

Para usar uma configuração de vários nós, você precisa de um pool de computação com pelo menos dois nós. Você pode criar um novo pool de computação ou alterar um pool existente. Em qualquer um dos comandos, passe um argumento MAX_NODES para definir a capacidade máxima do pool. É uma boa prática provisionar um ou mais nós extras para que você possa aumentar ou diminuir facilmente para cargas de trabalho maiores ou menores.

Para ver a capacidade de um pool de computação, use o comando DESCRIBE COMPUTE POOL. A capacidade está na coluna MAX_NODES da tabela retornada.

DESCRIBE COMPUTE POOL my_pool;
Copy

Para definir a capacidade de um pool de computação, use o comando ALTER COMPUTE POOL.

ALTER COMPUTE POOL <compute_pool_name>
    SET MAX_NODES = <total_capacity>;
Copy

Executar uma carga de trabalho em um cluster de vários nós

A escolha de um pool de computação de vários nós para o notebook é a única ação necessária para usar vários nós no pool de computação para executar uma carga de trabalho de ML.

No notebook, defina o número de nós ativos usando a snowflake.ml.runtime_cluster.scale_cluster Python API. O número de nós ativos em um pool de computação é o número de nós disponíveis para executar uma carga de trabalho, até MAX_NODES do pool. O método usa o número total de nós ativos necessários, incluindo o nó principal e todos os nós de trabalho, como seu parâmetro principal.

Nota

Essa função é bloqueadora por padrão (ou seja, aguarda até que a operação de dimensionamento termine) e tem um tempo limite de 12 minutos. Se a operação atingir o tempo limite, ela voltará automaticamente ao estado anterior.

As operações de dimensionamento não persistem entre as sessões. Ou seja, se um notebook terminar com um número diferente de zero nós de trabalho, ele não será automaticamente ampliado na próxima vez que for iniciado. Você deve chamar a API de dimensionamento novamente para definir o número de nós de trabalho.

Sintaxe

snowflake.ml.runtime_cluster.scale_cluster(
    expected_cluster_size: int,
    *,
    notebook_name: Optional[str] = None,
    is_async: bool = False,
    options: Optional[Dict[str, Any]] = None
) -> bool
Copy

Argumentos

  • expected_cluster_size (int): o número de nós ativos no pool de computação, até MAX_NODES do pool. Isso inclui o nó principal e todos os nós de trabalho.

  • notebook_name (Opcional [str]): o nome do notebook em que a carga de trabalho é executada. O pool de computação a ser dimensionado é o pool no qual o notebook especificado está sendo executado. Se não for fornecido, ele será determinado automaticamente a partir do contexto atual. Uma exceção será aberta se o nome do notebook errado for usado.

  • is_async (bool): controla se a função bloqueia a espera por dimensionamento:

    • Se False (falso) (padrão): a função bloqueia até que o cluster esteja totalmente pronto ou o tempo da operação expire.

    • Se True (verdadeiro): a função retorna imediatamente após confirmar que a solicitação de dimensionamento foi aceita.

  • options (Opcional [Dict[str, Any]]): opções avançadas de configuração:

    • rollback_after_seconds (int): tempo máximo antes da reversão automática se o dimensionamento não for concluído. O padrão é 720 segundos.

    • block_until_min_cluster_size (int): número mínimo de nós que devem estar prontos antes do retorno da função.

Retornos

True se o pool de computação for dimensionado com sucesso para o número especificado de nós ativos. Caso contrário, é gerada uma exceção.

Exemplo

from snowflake.ml.runtime_cluster import scale_cluster

# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)

# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers

# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)

# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Copy

Obter o número disponível de nós

Use a get_nodes API para obter informações sobre os nós ativos no cluster. A função não recebe argumentos.

Sintaxe

get_nodes() -> list
Copy

Retornos

Uma lista contendo detalhes dos nós ativos no cluster. Cada elemento da lista é um dicionário com as seguintes chaves:

  • name (str): o nome do nó.

  • cpus (int): o número de CPUs no nó.

  • gpus (int): o número de GPUs no nó.

Exemplo

from snowflake.ml.runtime_cluster import get_nodes

# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
Copy

A saída do código de exemplo é a seguinte:

2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]

Treinamento distribuído em clusters de vários nós

O Container Runtime para ML oferece suporte ao treinamento distribuído dos modelos LightGBM, XGBoost e PyTorch. As APIs de treinamento distribuído para LightGBMEstimator, XGBEstimator e PyTorch estão documentadas em detalhes na Referência de API.

Configuração de dimensionamento

Todos os modelos oferecem um parâmetro opcional de configuração de dimensionamento que permite que você especifique o recurso para o trabalho de treinamento. A configuração de dimensionamento é uma instância de uma classe específica do modelo: LightGBMScalingConfig, XGBScalingConfig ou PyTorchScalingConfig, dependendo do tipo de modelo.

Objetos de configuração de dimensionamento do LightGBM e XGBoost têm os seguintes atributos:

  • num_workers: o número de processos de trabalho a serem usados para treinamento. O padrão é -1, que define automaticamente o número de processos de trabalho.

  • num_cpu_per_worker: número de CPUs alocadas por processo de trabalho. O padrão é -1, que define automaticamente o número de CPUs por processo de trabalho.

  • use_gpu: se você deve usar a GPU para treinamento. O padrão é None (Nenhum), permitindo que o estimador escolha com base no ambiente. Ao usar a GPU, certifique-se de configurar também os parâmetros do modelo para usar a GPU.

Nota

Em geral, deixe num_workers e num_cpu_per_worker nos valores padrão, para que o Container Services para ML determine a melhor maneira de distribuir esses recursos. O tempo de execução atribui um trabalhador para cada nó no pool de computação e as CPUs ou GPUs necessárias para que cada trabalhador conclua a tarefa.

Os objetos de configuração de dimensionamento do PyTorch têm os seguintes atributos:

  • num_cpus: o número de núcleos de CPU a serem reservados para cada trabalhador.

  • num_gpus: o número de GPUs a serem reservadas para cada trabalhador. O padrão é 0, indicando que nenhuma GPUs está reservada.

Treinamento distribuído dos modelos LightGBM/XGBoost

Uso da memória

Normalmente, um nó com n GB de RAM pode treinar um modelo com n/4 a n/3 de dados sem ficar sem memória. O tamanho máximo do conjunto de dados depende do número de processos de trabalho e do algoritmo de treinamento usado.

Desempenho de computação

O desempenho do treinamento com vários nós depende dos parâmetros do modelo, como a profundidade da árvore, o número de árvores e o número máximo de compartimentos. Aumentar os valores desses parâmetros pode aumentar o tempo total de treinamento em um conjunto de dados.

Exemplo

O exemplo a seguir mostra como treinar um modelo do XGBoost em um cluster de vários nós. O treinamento dos modelos do LightGBM é semelhante.

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"

# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
    sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
    sql_script += f"select \n"
    for i in range(1, 10):
        sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
    sql_script += f"FT_1 + FT_2 AS TARGET, \n"
    sql_script += f"from TABLE(generator(rowcount=>({10000})));"
    return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""

sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
    sample_train_df, ingestor_class=RayDataIngester
)

xgb_model = estimator.fit(
    data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Copy

Treinamento distribuído de modelos do PyTorch

Os modelos do PyTorch são treinados usando uma função de treinamento (train_func) que é chamada em cada processo de trabalho.

Uso de APIs de contexto

Durante a execução da função de treinamento, você pode usar APIs de contexto para acessar metadados essenciais sobre o ambiente de treinamento e para o encaminhamento de parâmetros do chamador para as funções de treinamento. Consulte Classes relacionadas para obter a documentação da classe de contexto do PyTorch.

O objeto de contexto expõe metadados de tempo de execução que você pode usar para personalizar o comportamento da função de treinamento. Você pode recuperá-los usando os métodos fornecidos get_node_rank, get_local_rank, get_world_size, entre outros.

O código a seguir é um exemplo de recuperação dos valores test e train do objeto de contexto; eles são passados em uma chave chamada dataset_map (que você pode ver no exemplo da função de treinamento mais adiante neste tópico). Esses valores são usados para criar objetos de conjunto de dados do PyTorch que, em seguida, são passados para o modelo.

dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Copy

Relatórios de métricas

Use o método metrics_reporter do objeto de contexto para enviar métricas da função de treinamento para o código de controle. Isso permite o monitoramento e a depuração em tempo real do processo de treinamento, conforme mostrado no exemplo a seguir.

context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Copy

Exemplo

O exemplo a seguir é uma função de treinamento para um modelo do PyTorch.

def train_func():
    import io
    import base64
    import time
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torchvision import transforms
    from torch.utils.data import IterableDataset
    from torch.optim.lr_scheduler import StepLR
    from PIL import Image
    from snowflake.ml.modeling.distributors.pytorch import get_context

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    class DecodedDataset(IterableDataset):
        def __init__(self, source_dataset):
            self.source_dataset = source_dataset
            self.transforms = transforms.ToTensor()  # Ensure we apply ToTensor transform

        def __iter__(self):
            for row in self.source_dataset:
                base64_image = row['IMAGE']
                image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
                # Convert the image to a tensor
                image = self.transforms(image)  # Converts PIL image to tensor

                labels = row['LABEL']
                yield image, int(labels)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        batch_idx = 1
        for data, target in train_loader:
            # print(f"data : {data} \n target: {target}")
            # raise RuntimeError("test")
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
            batch_idx += 1

    context = get_context()
    rank = context.get_local_rank()
    device = f"cuda:{rank}"
    is_distributed = context.get_world_size() > 1
    if is_distributed:
        dist.init_process_group(backend="nccl")
    print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")

    dataset_map = context.get_dataset_map()
    train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
    test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

    batch_size = 64
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )

    model = Net().to(device)
    if is_distributed:
        model = DDP(model)
    optimizer = optim.Adadelta(model.parameters())
    scheduler = StepLR(optimizer, step_size=1)

    hyper_parms = context.get_hyper_params()
    num_epochs = int(hyper_parms['num_epochs'])
    start_time = time.time()
    for epoch in range(num_epochs):
        train(model, device, train_loader, optimizer, epoch+1)
        scheduler.step()
    now = time.time()
    context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
    test(model, device, test_loader, context)
Copy

O código a seguir ilustra como iniciar o treinamento distribuído com a função de treinamento anterior. O exemplo cria um objeto distribuidor do PyTorch para executar o treinamento em vários nós, conecta os dados de treinamento e teste à função de treinamento por meio de um objeto de contexto e estabelece a configuração de dimensionamento antes de executar o treinador.

# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector

df = session.table("MNIST_60K")

train_df, test_df = df.random_split([0.99, 0.01], 0)

# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(  # scaling configuration
        num_nodes=2,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    )
)

# Run the trainer.
results = pytorch_trainer.run(  # accepts context values as parameters
    dataset_map={"train": train_data, "test": test_data},
    hyper_params={"num_epochs": "1"}
)
Copy

Limitações conhecidas e problemas comuns

É provável que essas limitações e problemas sejam resolvidos antes que o treinamento de vários nós no Container Runtime para ML esteja disponível de forma geral.

Tempo limite da operação de dimensionamento expirado

A operação de dimensionamento pode falhar porque os novos nós não estão prontos dentro do tempo limite de 12 minutos. As possíveis causas incluem:

  • Capacidade insuficiente do pool. Você solicitou mais nós do que o MAX_NODES do pool. Aumente o MAX_NODES do pool.

  • Contenção de recursos. 12 minutos pode não ser tempo suficiente para aquecer os nós adicionados. Defina o MIN_NODES do pool como um número maior para manter alguns dos nós aquecidos ou aumente o número de nós ativos usando mais de uma chamada para scale_cluster com um incremento menor. Outra opção é usar o modo assíncrono para não esperar que todos os nós estejam prontos:

    • Use o modo assíncrono para operações sem bloqueio:

    scale_cluster(3, is_async=True)
    
    Copy
    • Aumentar o limite de tempo:

    scale_cluster(3, options={"rollback_after_seconds": 1200})
    
    Copy

Erros no nome do notebook

Se for exibida uma mensagem de erro como «Notebook <name> does not exist or not authorized» (Notebook <nome> não existe ou não autorizado), isso significa que o nome do notebook detectado automaticamente não corresponde ao notebook atual. Isso pode ocorrer quando:

  • O nome do notebook contém caracteres especiais, como pontos e espaços

  • A detecção automática do nome do notebook não está funcionando corretamente

Solução: forneça explicitamente o parâmetro de nome do notebook. Observe que o nome do notebook precisa de aspas duplas para ser tratado como um identificador:

# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
    scale_cluster(2)
except Exception as e:
    print(e)  # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
    scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Copy

Os serviços SPCS não são limpos após uma operação de dimensionamento com falha

Quando as operações de dimensionamento falham, o sistema deve limpar todos os recursos criados na operação. No entanto, se isso falhar, um ou mais serviços SPCS poderão ser deixados no estado PENDING ou FAILED. Os serviços no estado PENDING podem se tornar ACTIVE posteriomente ou, se não houver capacidade no pool de computação, permanecer PENDING para sempre.

Para remover serviços nos estados PENDING ou FAILED, dimensione o cluster para ter um nó (zero nós de trabalho). Para limpar todos os serviços iniciados, encerre a sessão atual do notebook clicando em «End Session» (Encerrar sessão) na interface do notebook.