Dimensionar um aplicativo usando Ray

O tempo de execução do contêiner Snowflake se integra ao Ray, uma estrutura unificada de código aberto para escalonamento de aplicativos de AI e Python. Essa integração permite usar os recursos de computação distribuída do Ray no Snowflake para suas cargas de trabalho de aprendizado de máquina.

O Ray é pré-instalado e executado como um processo em segundo plano no tempo de execução do contêiner de ML do Snowflake. É possível acessar o Ray no tempo de execução para ML das seguintes maneiras:

Notebooks Snowflake: um ambiente interativo em que é possível se conectar ao Ray, definir tarefas e dimensionar o cluster dinamicamente para desenvolvimento e experimentação.

Trabalhos de ML do Snowflake: envie seus aplicativos Ray como trabalhos estruturados e repetíveis. É possível especificar o tamanho do cluster como parte da configuração do trabalho para cargas de trabalho de produção.

Quando você executa o tempo de execução do contêiner em um notebook Snowflake ou trabalho de ML, o processo Ray é iniciado automaticamente como parte desse contêiner.

Use o seguinte código Python para estabelecer conexão com o cluster:

import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Copy

Importante

Certifique-se de usar sempre o endereço "auto" quando estiver se conectando ao cluster do Ray. A inicialização com o endereço "auto" direciona seu aplicativo para o nó principal do cluster do Ray que o Snowflake provisionou para sua sessão.

Escalonamento do cluster do Ray

Após estabelecer conexão com o cluster do Ray, você pode ajustar o tamanho dele para atender às demandas computacionais de sua carga de trabalho.

Use estas abordagens para dimensionar seu cluster do Ray:

Em um notebook, é possível aumentar ou reduzir a escala vertical do cluster usando a função scale_cluster. Isso é ideal para fluxos de trabalho interativos em que as necessidades de recursos podem mudar.

Ao especificar expected_cluster_size=5, você obtém 1 nó principal e 4 nós de trabalho.

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")

# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Copy

Depois que terminar de usar o cluster, você poderá reduzir sua escala vertical. Para obter mais informações, consulte Limpeza.

Monitoramento com o painel do Ray

Se você está executando um trabalho de um notebook Snowflake, pode usar o painel do Ray para monitorar seu cluster. O painel é uma interface da Web que permite visualizar os recursos, os trabalhos, as tarefas e o desempenho do cluster. Use o código a seguir para obter o URL do painel:

from snowflake.ml.runtime_cluster import get_ray_dashboard_url

# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Copy

Abra o URL em uma nova guia do navegador, faça login com suas credenciais do Snowflake.

Casos de uso avançados

Esta seção abrange os recursos avançados do Ray para cargas de trabalho complexas e para a migração de aplicativos existentes.

Criação e operação de cargas de trabalho distribuídas com Ray

O Ray fornece componentes que permitem criar e operar cargas de trabalho distribuídas. Isso inclui componentes básicos via Ray Core com primitivos essenciais para criar e dimensionar essas cargas de trabalho.

Também inclui as seguintes bibliotecas que permitem criar fluxos de trabalho próprios para pré-processamento de dados, treinamento de ML, ajuste de hiperparâmetros e inferência de modelo:

  • Ray Data: processamento e transformação de dados escaláveis

  • Ray Train: treinamento distribuído e ajuste fino de modelos de ML

  • Ray Tune: otimização de hiperparâmetros com algoritmos de pesquisa avançados

  • Ray Serve: serviço de modelo e inferência

As seções a seguir descrevem como usar essas bibliotecas diretamente, enquanto as interfaces nativas do Snowflake desenvolvidas com base no Ray fornecem ferramentas adicionais para criar, implantar e operacionalizar aplicativos baseados no Ray.

Ray Core: tarefas e atores

O Ray fornece os seguintes primitivos de computação distribuída:

  • Tarefas: funções sem estado que são executadas remotamente e retornam valores

  • Atores: classes com estado que podem ser instanciadas remotamente e chamadas várias vezes

  • Objetos: valores imutáveis armazenados no armazenamento de objetos distribuídos do Ray

  • Recursos: CPU, GPU e requisitos de recursos personalizados para tarefas e atores

O exemplo a seguir demonstra como usar uma tarefa básica e atores do Ray para fazer regressão linear:

import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)

# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])

# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
    """CPU-intensive computation example"""
    # Simulate heavy computation (matrix operations)
    result = np.dot(data, data.T)
    return np.mean(result)

# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
    def __init__(self):
        # Load a simple model
        self.model = LinearRegression()
        # Train on dummy data
        X_dummy = np.random.randn(100, 5)
        y_dummy = np.random.randn(100)
        self.model.fit(X_dummy, y_dummy)

    def process_batch(self, batch):
        # Convert to numpy if it's a DataFrame
        if isinstance(batch, pd.DataFrame):
            batch_array = batch.values
        else:
            batch_array = batch
        return self.model.predict(batch_array)

# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future)  # Blocks until task completes
print(f"Task result: {result}")

# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Copy

Ray Train: treinamento distribuído

O Ray Train é uma biblioteca que permite treinamento distribuído e ajuste fino de modelos. Você pode executar seu código de treinamento em uma única máquina ou em um cluster inteiro. Para o Ray no Snowflake, você pode usar o Ray Train para execução de nó único, mas não para execução de vários nós.

Para treinar vários nós distribuídos, use as funções de treinamento otimizado no tempo de execução do contêiner. Essas funções fornecem integração de treinamento distribuído do XGBoost, LightGBM PyTorch com manuseio automático de armazenamento que usa internamente o mesmo cluster do Ray.

Ray Data: processamento de dados escalável

O Ray Data fornece processamento de dados escalável e distribuído para cargas de trabalho de ML. Ele pode lidar com conjuntos de dados maiores que a memória do cluster por meio de execução de streaming e avaliação lenta.

Nota

O Snowflake oferece integração nativa para transformar qualquer fonte de dados do Snowflake em Ray Data. Para mais informações, consulte as páginas Conector de dados e Ingestão de Ray Data.

Use Ray Data para:

  • Processamento de grandes conjuntos de dados que não cabem na memória de nó único

  • Pré-processamento de dados distribuídos e engenharia de recursos

  • Criação de pipelines de dados que se integram a outras bibliotecas Ray

import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)

# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15

# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
    base_features[:, 0] * base_features[:, 1],  # interaction
    np.sin(base_features[:, 2]),  # non-linear
    base_features[:, 3] ** 2,  # polynomial
    np.random.randn(n_samples, n_features - 8)  # additional random features
])

X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)

sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y

print(f"Created dataset with {n_samples} samples and {n_features} features")

# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)

# Transform data with Ray Data operations
def preprocess_batch(batch):
    """Preprocess a batch of data"""
    # Get all feature columns
    feature_cols = [col for col in batch.columns if col.startswith('feature_')]

    # Normalize numerical features (first 3 for demo)
    for col in feature_cols[:3]:
        if col in batch.columns:
            batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()

    # Add derived features using actual column names
    if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
        batch['feature_0_squared'] = batch['feature_0'] ** 2
        batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']

    return batch

# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
    preprocess_batch,
    batch_format="pandas"
)

# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)

# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas()  # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")

# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
    batch_count += 1
    print(f"Batch {batch_count}: {batch.shape}")
    if batch_count >= 3:  # Just show first 3 batches
        break

print(f"Total batches processed: {batch_count}")
Copy

Ray Tune: Ajuste de hiperparâmetros distribuídos

O Ray Tune fornece otimização de hiperparâmetros distribuídos com algoritmos de pesquisa avançados e recursos de interrupção antecipada. Para uma experiência mais integrada e otimizada ao ler de fontes de dados do Snowflake, use a API Hyperparameter Optimization (HPO) nativa. Para obter mais informações sobre o uso da otimização de HPO, consulte Otimizar os hiperparâmetros de um modelo.

Se você está procurando uma abordagem mais personalizável para uma implementação distribuída da HPO, use o Ray Tune.

Você pode usar o Ray Tune para os seguintes casos de uso:

  • Otimização de hiperparâmetros em vários testes em paralelo

  • Algoritmos avançados de pesquisa (otimização bayesiana, treinamento baseado em população)

  • Varreduras de hiperparâmetros em grande escala que exigem execução distribuída

import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)

# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10

X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

def train_function(config):
    """Training function that gets hyperparameters from Ray Tune"""
    # Train model with current hyperparameters
    model = RandomForestClassifier(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"],
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate and report results
    val_predictions = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_predictions)

    # Report metrics back to Ray Tune
    return {"accuracy": accuracy}

# Define search space
search_space = {
    "n_estimators": tune.randint(50, 200),
    "max_depth": tune.randint(3, 15),
    "min_samples_split": tune.randint(2, 10)
}

# Configure and run hyperparameter optimization
tuner = tune.Tuner(
    tune.with_resources(
        train_function,
        resources={"CPU": 2}
    ),
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="accuracy",
        mode="max",
        num_samples=20,  # Number of trials
        max_concurrent_trials=4
    )
)

print("Starting hyperparameter optimization...")
results = tuner.fit()

# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f"   Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f"   Best parameters: {best_result.config}")

# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
    print(f"  {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Copy

Model Serving

Para distribuição de modelos, você pode usar os recursos nativos do Snowflake. Para obter mais informações, consulte Model Serving no Snowpark Container Services.

Envio e gerenciamento de aplicativos distribuídos em clusters do Ray

Use os trabalhos do Ray para enviar e gerenciar aplicativos distribuídos em clusters do Ray com melhor isolamento de recursos e gerenciamento do ciclo de vida. Para todas as execuções baseadas em trabalhos que requerem acesso a um cluster do Ray, a Snowflake recomenda o uso de um trabalho de ML em que seja possível definir a lógica do aplicativo Ray. Para instâncias em que você precisa de acesso direto à interface do trabalho do Ray, como a migração de uma implementação existente, você pode usar o primitivo do trabalho do Ray como descrito na Documentação do Ray.

Use os trabalhos do Ray para:

  • Pipelines de ML de produção e fluxos de trabalho programados

  • Cargas de trabalho de longa duração que exigem tolerância a falhas

  • Processamento em lote e processamento de dados em grande escala

import ray
from ray.job_submission import JobSubmissionClient
import os

# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)

# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"

client = JobSubmissionClient(dashboard_address)

# Simple job script
job_script = '''
import ray

@ray.remote
def compute_task(x):
    return x * x

# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''

# Submit job
job_id = client.submit_job(
    entrypoint=f"python -c '{job_script}'",
    runtime_env={"pip": ["numpy"]},
    submission_id="my-ray-job"
)

print(f"Submitted job: {job_id}")

# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Copy

Dimensionamento de clusters do Ray com opções

A partir de um notebook Snowflake, é possível dimensionar seus clusters do Ray para atender às demandas computacionais com precisão. Um cluster consiste em um nó principal (coordenador) e nós de trabalho (para execução de tarefas).

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Asynchronous scaling - returns immediately
scale_cluster(
    expected_cluster_size=2,
    is_async=True  # Don't wait for all nodes to be ready
)

# Scaling with custom options
scale_cluster(
    expected_cluster_size=3,
    options={
        "rollback_after_seconds": 300,  # Auto-rollback after 5 minutes
        "block_until_min_cluster_size": 2  # Return when at least 2 nodes ready
    }
)

# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Copy

Monitoramento de recursos

import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
    get_available_cpu, get_available_gpu, get_num_cpus_per_node
)

# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()

print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")

# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")

# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Copy

Limpeza

Ao terminar de usar o cluster, você poderá reduzi-lo para evitar cobranças adicionais. Use o código a seguir para reduzir a escala vertical:

# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")
Copy