Trabalhos de ML de vários nós do Snowflake

Use trabalhos de ML de vários nós do Snowflake para executar fluxos de trabalho de aprendizado de máquina (ML) distribuídos dentro de tempos de execução de contêiner de ML do Snowflake em vários nós de computação. Distribua o trabalho em vários nós para processar grandes conjuntos de dados e modelos complexos com melhor desempenho. Para obter mais informações sobre trabalhos de ML do Snowflake, consulte Snowflake ML Jobs.

Trabalhos de ML de vários nós do Snowflake estendem os recursos de ML do Snowflake permitindo a execução distribuída em vários nós. Isso lhe proporciona:

  • Desempenho escalável: dimensionamento horizontal para processar conjuntos de dados grandes demais para caber em um único nó

  • Tempo de treinamento reduzido: aceleração do treinamento de modelos complexos por meio de paralelização

  • Eficiência de recursos: otimização da utilização de recursos para cargas de trabalho com uso intensivo de dados

  • Integração da estrutura: use estruturas distribuídas sem interrupções como classes de modelagem distribuída e Ray.

Quando você executa um trabalho de ML do Snowflake com vários nós, ocorre o seguinte:

  • Um nó atua como nó principal (coordenador)

  • Nós adicionais atuam como nós de trabalho (recursos de computação)

  • Juntos, os nós formam uma única entidade de trabalho de ML lógica no Snowflake

Um trabalho de ML de nó único tem apenas um nó principal. Um trabalho de vários nós com três nós ativos tem um nó principal e dois nós de trabalho. Todos os três nós participam da execução de sua carga de trabalho.

Pré-requisitos

Os seguintes pré-requisitos são necessários para usar trabalhos de ML de vários nós do Snowflake.

Importante

Os trabalhos de ML de vários nós do Snowflake atualmente oferecem suporte apenas a clientes Python 3.10. Entre em contato com sua equipe de conta Snowflake se precisar de suporte para outras versões do Python.

Para configurar trabalhos de vários nós, faça o seguinte:

  1. Instale o pacote Snowflake ML Python em seu ambiente Python 3.10.

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. Crie um pool de computação com nós suficientes para oferecer suporte ao seu trabalho de vários nós:

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = 1
      MAX_NODES = <NUM_INSTANCES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy

    Importante

    Você deve definir MAX_NODES para ser maior ou igual ao número de instâncias de destino que você está usando para executar seu trabalho de treinamento. Se você solicitar mais nós do que pretende usar para o trabalho de treinamento, ele poderá falhar ou comportar-se de forma imprevisível. Para obter informações sobre como executar um trabalho de treinamento, consulte Execução de trabalhos de ML de vários nós.

Como escrever código para trabalhos de vários nós

Para trabalhos de vários nós, seu código precisa ser projetado para processamento distribuído usando classes de modelagem distribuída ou Ray.

A seguir estão os principais padrões e considerações quando você usa classes de modelagem distribuída ou Ray:

Compreensão de inicialização e disponibilidade de nós

Em trabalhos de vários nós, os nós de trabalho podem ser inicializados de forma assíncrona e em momentos diferentes:

  • Os nós podem não ser todos iniciados simultaneamente, especialmente se os recursos do pool de computação forem limitados

  • Alguns nós podem começar segundos ou até minutos após outros

  • Os trabalhos de ML esperam automaticamente que o target_instances especificado fique disponível antes de executar sua carga útil. O trabalho falhará com um erro se os nós esperados não estiverem disponíveis dentro do período de tempo limite. Para obter mais informações sobre como personalizar esse comportamento, consulte Configuração avançada: Uso de min_instances.

É possível verificar os nós disponíveis em seu trabalho por meio do Ray:

import ray
ray.init(address="auto", ignore_reinit_error=True)  # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Copy

Padrões de processamento distribuído

Existem vários padrões que você pode aplicar no corpo de carga útil do trabalho de vários nós para processamento distribuído. Esses padrões aproveitam as classes de modelagem distribuída e o Ray:

Uso de API de treinamento distribuído do Snowflake

O Snowflake fornece treinadores otimizados para estruturas de ML comuns:

# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()

# Create distributed estimator
estimator = XGBEstimator(
    n_estimators=100,
    params={"objective": "reg:squarederror"},
    scaling_config=scaling_config
)

# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Copy

Para obter mais informações sobre as APIs disponíveis, consulte Classes de modelagem distribuída .

Uso de tarefas nativas do Ray

Outra abordagem é usar o modelo de programação baseado em tarefas do Ray:

# Inside the ML Job payload body
import ray

@ray.remote
def process_chunk(data_chunk):
    # Process a chunk of data
    return processed_result

# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Copy

Para obter mais informações, consulte a documentação de programação de tarefas do Ray.

Execução de trabalhos de ML de vários nós

Você pode executar trabalhos de ML de vários nós da mesma forma que trabalhos de nó único, usando o parâmetro target_instances:

Uso do decorator remoto

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=3  # Specify the number of nodes
)
def distributed_training(data_table: str):

    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    # Configure scaling for distributed execution
    scaling_config = XGBScalingConfig()

    # Create distributed estimator
    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using distributed resources
    # NOTE: data_connector and feature_cols excluded for brevity
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")


job = distributed_training("<my_training_data>")
Copy

Execução de um arquivo Python

from snowflake.ml.jobs import submit_file

job = submit_file(
    "<script_path>",
    "MY_COMPUTE_POOL",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

Execução de um diretório

from snowflake.ml.jobs import submit_directory

job = submit_directory(
    "<script_directory>",
    "MY_COMPUTE_POOL",
    entrypoint="<script_name>",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

Configuração avançada: Uso de min_instances

Para um gerenciamento de recursos mais flexível, você pode usar o parâmetro min_instances opcional para especificar um número mínimo de instâncias necessárias para que o trabalho prossiga. Se min_instances for definido, a carga útil do trabalho será executada assim que o número mínimo de nós estiver disponível, mesmo que esse número seja menor que target_instances.

Isso é útil quando você deseja:

  • Começar o treinamento com menos nós se o destino completo não estiver disponível imediatamente

  • Reduzir os tempos de espera quando os recursos do pool de computação são limitados

  • Implementar fluxos de trabalho tolerantes a falhas que podem se adaptar a disponibilidade variável de recursos

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=5,  # Prefer 5 nodes
    min_instances=3      # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
    import ray

    # Check how many nodes we actually got
    available_nodes = len(ray.nodes())
    print(f"Training with {available_nodes} nodes")

    # Adapt your training logic based on available resources
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    scaling_config = XGBScalingConfig(
        num_workers=available_nodes
    )

    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using available distributed resources
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")

job = flexible_distributed_training("<my_training_data>")
Copy

Gerenciamento de trabalhos de vários nós

Monitoramento do status do trabalho

O monitoramento do status do trabalho permanece inalterado em relação aos trabalhos de nó único:

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # PENDING, RUNNING, FAILED, DONE

# Wait for completion
job.wait()
Copy

Acesso aos logs por nó

Em trabalhos de vários nós, você pode acessar logs de instâncias específicas:

# Get logs from the default (head) instance
logs_default = job.get_logs()

# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)

# Display logs in the notebook/console
job.show_logs()  # Default (head) instance logs
job.show_logs(instance_id=0)  # Instance 0 logs (not necessarily the head node)
Copy

Problemas e limitações comuns

Use as informações a seguir para resolver problemas comuns que você possa encontrar.

  • Falhas na conexão do nó: se os nós de trabalho não se conectarem ao nó principal, é possível que o nó principal conclua sua tarefa e depois se desligue antes que o trabalhador termine seu trabalho. Para evitar falhas de conexão, implemente a lógica de coleta de resultados no trabalho.

  • Exaustão de memória: se os trabalhos falharem devido a problemas de memória, aumente o tamanho do nó ou use mais nós com menos dados por nó.

  • Tempo limite de disponibilidade do nó: se o número necessário de instâncias (target_instances ou min_instances) não estiver disponível dentro do tempo limite predefinido, o trabalho falhará. Certifique-se de que seu pool de computação tenha capacidade suficiente ou ajuste os requisitos de sua instância.