Dimensionner une application à l’aide de Ray

Le Container Runtime Snowflake s’intègre à Ray, un framework unifié open source permettant de faire évoluer les applications Python et d’AI. Cette intégration vous permet d’utiliser les capacités de calcul distribué de Ray sur Snowflake pour vos charges de travail de machine learning.

Ray est préinstallé et s’exécute en arrière-plan dans le Container Runtime ML Snowflake. Vous pouvez accéder à Ray à partir du Container Runtime pour ML des manières suivantes :

Snowflake Notebooks : Un environnement interactif où vous pouvez vous connecter à Ray, définir des tâches et faire évoluer votre cluster de manière dynamique pour le développement et l’expérimentation.

Tâches ML Snowflake : Soumettez vos applications Ray sous forme de tâches structurées et reproductibles. Vous pouvez spécifier la taille du cluster dans le cadre de la configuration des tâches pour les charges de travail de production.

Lorsque vous exécutez le Container Runtime dans un notebook Snowflake ou une tâche ML, le processus Ray est automatiquement lancé dans le cadre de ce conteneur.

Utilisez le code Python suivant pour vous connecter au cluster :

import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Copy

Important

Veillez à toujours utiliser l’adresse "auto" lorsque vous vous connectez au cluster Ray. L’initialisation avec l’adresse "auto" dirige votre application vers le nœud principal du cluster Ray que Snowflake a provisionné pour votre session.

Évolution de votre cluster Ray

Après avoir établi la connexion au cluster Ray, vous pouvez ajuster sa taille pour répondre aux exigences de calcul de votre charge de travail.

Utilisez les approches suivantes pour faire évoluer votre cluster Ray :

Dans un notebook, vous pouvez augmenter ou réduire dynamiquement votre cluster à l’aide de la fonction scale_cluster. Cette fonction est idéale pour les workflows interactifs où les besoins en ressources peuvent évoluer.

Lorsque vous spécifiez expected_cluster_size=5, vous obtenez 1 nœud principal et 4 nœuds de travail.

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")

# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Copy

Lorsque vous avez fini d’utiliser votre cluster, vous pouvez le réduire. Pour plus d’informations, voir Nettoyage.

Surveillance avec le tableau de bord Ray

Si vous exécutez une tâche à partir d’un notebook Snowflake, vous pouvez utiliser le tableau de bord Ray pour surveiller votre cluster. Le tableau de bord est une interface Web qui vous permet de visualiser les ressources, les tâches et les performances du cluster. Utilisez le code suivant pour obtenir l’URL du tableau de bord :

from snowflake.ml.runtime_cluster import get_ray_dashboard_url

# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Copy

Ouvrez l’URL dans un nouvel onglet du navigateur, puis connectez-vous avec vos identifiants Snowflake.

Cas d’utilisation avancés

Cette section couvre les fonctions avancées de Ray pour les charges de travail complexes et la migration d’applications existantes.

Création et opération de charges de travail distribuées avec Ray

Ray fournit des composants qui vous permettent de créer et de gérer des charges de travail distribuées. Il s’agit notamment de composants fondamentaux via Ray Core avec des primitives essentielles pour créer et faire évoluer ces charges de travail.

Il comprend également les bibliothèques suivantes qui vous permettent de créer vos propres workflows pour le prétraitement des données, l’entraînement ML, le réglage des hyperparamètres et l’inférence des modèles :

  • Ray Data : Traitement et transformation de données évolutifs

  • Ray Train : Entraînement et mise au point distribués des modèles ML

  • Ray Tune : Optimisation des hyperparamètres avec des algorithmes de recherche avancés

  • Ray Serve : Service et inférence du modèle

Les sections suivantes décrivent comment vous pouvez utiliser ces bibliothèques directement, tandis que les interfaces Snowflake natives construites sur Ray fournissent des outils supplémentaires pour créer, déployer et opérationnaliser des applications basées sur Ray.

Ray Core : Tâches et acteurs

Ray fournit les primitives de calcul distribué suivantes :

  • Tâches : Fonctions sans état qui s’exécutent à distance et renvoient des valeurs

  • Acteurs : Classes avec état pouvant être instanciées à distance et appelées plusieurs fois

  • Objets : Valeurs immuables stockées dans le magasin d’objets distribué de Ray

  • Ressources : CPU, GPU et besoins en ressources personnalisées pour les tâches et les acteurs

L’exemple suivant montre comment utiliser une tâche et des acteurs Ray de base pour effectuer une régression linéaire :

import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)

# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])

# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
    """CPU-intensive computation example"""
    # Simulate heavy computation (matrix operations)
    result = np.dot(data, data.T)
    return np.mean(result)

# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
    def __init__(self):
        # Load a simple model
        self.model = LinearRegression()
        # Train on dummy data
        X_dummy = np.random.randn(100, 5)
        y_dummy = np.random.randn(100)
        self.model.fit(X_dummy, y_dummy)

    def process_batch(self, batch):
        # Convert to numpy if it's a DataFrame
        if isinstance(batch, pd.DataFrame):
            batch_array = batch.values
        else:
            batch_array = batch
        return self.model.predict(batch_array)

# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future)  # Blocks until task completes
print(f"Task result: {result}")

# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Copy

Ray Train : Entraînement distribué

Ray Train est une bibliothèque qui permet un entraînement distribué et un affinage des modèles. Vous pouvez exécuter votre code d’entraînement sur une seule machine ou sur un cluster entier. Pour Ray sur Snowflake, vous pouvez utiliser Ray Train pour une exécution à un seul nœud, mais pas pour une exécution à plusieurs nœuds.

Pour l’entraînement distribué à plusieurs nœuds, utilisez les fonctions d’entraînement optimisé dans le Container Runtime. Ces fonctions fournissent un entraînement distribué XGBoost, LightGBM et PyTorch intégré avec une gestion automatique du stockage qui utilise en interne le même cluster Ray.

Ray Data : Traitement des données évolutif

Ray Data fournit un traitement des données distribué et évolutif pour les charges de travail ML. Il peut traiter des ensembles de données d’une taille supérieure à la mémoire du cluster grâce à l’exécution en continu et à l’évaluation différée.

Note

Snowflake propose une intégration native pour transformer n’importe quelle source de données Snowflake en données Ray. Pour plus d’informations, consultez les pages Connecteur de données et Ingestion de données Ray.

Utilisez Ray Data pour :

  • Le traitement de grands ensembles de données qui ne tiennent pas dans la mémoire d’un seul nœud

  • Le prétraitement des données distribuées et ingénierie des fonctionnalités

  • La création de pipelines de données qui s’intègrent à d’autres bibliothèques Ray

import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)

# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15

# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
    base_features[:, 0] * base_features[:, 1],  # interaction
    np.sin(base_features[:, 2]),  # non-linear
    base_features[:, 3] ** 2,  # polynomial
    np.random.randn(n_samples, n_features - 8)  # additional random features
])

X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)

sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y

print(f"Created dataset with {n_samples} samples and {n_features} features")

# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)

# Transform data with Ray Data operations
def preprocess_batch(batch):
    """Preprocess a batch of data"""
    # Get all feature columns
    feature_cols = [col for col in batch.columns if col.startswith('feature_')]

    # Normalize numerical features (first 3 for demo)
    for col in feature_cols[:3]:
        if col in batch.columns:
            batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()

    # Add derived features using actual column names
    if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
        batch['feature_0_squared'] = batch['feature_0'] ** 2
        batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']

    return batch

# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
    preprocess_batch,
    batch_format="pandas"
)

# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)

# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas()  # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")

# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
    batch_count += 1
    print(f"Batch {batch_count}: {batch.shape}")
    if batch_count >= 3:  # Just show first 3 batches
        break

print(f"Total batches processed: {batch_count}")
Copy

Ray Tune : Réglage distribué des hyperparamètres

Ray Tune fournit une optimisation distribuée des hyperparamètres avec des algorithmes de recherche avancés et des capacités d’arrêt précoce. Pour une expérience parfaitement intégrée et optimisée lors de la lecture de sources de données Snowflake, utilisez l’API native d’optimisation des hyperparamètres (HPO). Pour plus d’informations sur l’utilisation de l’optimisation HPO, voir Optimiser les hyperparamètres d’un modèle.

Si vous recherchez une approche plus personnalisable pour une mise en œuvre HPO distribuée, utilisez Ray Tune.

Vous pouvez utiliser Ray Tune pour les cas d’utilisation suivants :

  • Optimisation des hyperparamètres sur plusieurs essais en parallèle

  • Algorithmes de recherche avancés (optimisation bayésienne, entraînement basé sur la population)

  • Analyses à grande échelle d’hyperparamètres nécessitant une exécution distribuée

import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)

# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10

X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

def train_function(config):
    """Training function that gets hyperparameters from Ray Tune"""
    # Train model with current hyperparameters
    model = RandomForestClassifier(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"],
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate and report results
    val_predictions = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_predictions)

    # Report metrics back to Ray Tune
    return {"accuracy": accuracy}

# Define search space
search_space = {
    "n_estimators": tune.randint(50, 200),
    "max_depth": tune.randint(3, 15),
    "min_samples_split": tune.randint(2, 10)
}

# Configure and run hyperparameter optimization
tuner = tune.Tuner(
    tune.with_resources(
        train_function,
        resources={"CPU": 2}
    ),
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="accuracy",
        mode="max",
        num_samples=20,  # Number of trials
        max_concurrent_trials=4
    )
)

print("Starting hyperparameter optimization...")
results = tuner.fit()

# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f"   Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f"   Best parameters: {best_result.config}")

# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
    print(f"  {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Copy

Service du modèle

Pour le service du modèle, vous pouvez utiliser les capacités natives de Snowflake. Pour plus d’informations, voir Model Serving dans Snowpark Container Services.

Soumettre et gérer des applications distribuées sur des clusters Ray

Utilisez les tâches Ray pour soumettre et gérer des applications distribuées sur des clusters Ray en bénéficiant d’une meilleure isolation des ressources et d’une meilleure gestion du cycle de vie. Pour toutes les exécutions basées sur des tâches qui nécessitent un accès à un cluster Ray, Snowflake recommande d’utiliser une tâche ML, où vous pouvez définir la logique d’application Ray. Pour les instances où vous avez besoin d’un accès direct à l’interface Ray Job, par exemple pour migrer une implémentation existante, vous pouvez utiliser la primitive Ray Job comme décrit dans la documentation Ray.

Utilisez les tâches Ray pour :

  • la production de pipelines ML et de workflows planifiés

  • les charges de travail de longue durée nécessitant une tolérance aux pannes

  • le traitement par lots et traitement de données à grande échelle

import ray
from ray.job_submission import JobSubmissionClient
import os

# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)

# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"

client = JobSubmissionClient(dashboard_address)

# Simple job script
job_script = '''
import ray

@ray.remote
def compute_task(x):
    return x * x

# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''

# Submit job
job_id = client.submit_job(
    entrypoint=f"python -c '{job_script}'",
    runtime_env={"pip": ["numpy"]},
    submission_id="my-ray-job"
)

print(f"Submitted job: {job_id}")

# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Copy

Mise à l’échelle des clusters Ray avec options

À partir d’un notebook Snowflake, vous pouvez mettre à l’échelle vos clusters Ray pour répondre précisément aux exigences de calcul. Un cluster se compose d’un nœud principal (coordinateur) et de nœuds de travail (pour l’exécution des tâches).

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Asynchronous scaling - returns immediately
scale_cluster(
    expected_cluster_size=2,
    is_async=True  # Don't wait for all nodes to be ready
)

# Scaling with custom options
scale_cluster(
    expected_cluster_size=3,
    options={
        "rollback_after_seconds": 300,  # Auto-rollback after 5 minutes
        "block_until_min_cluster_size": 2  # Return when at least 2 nodes ready
    }
)

# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Copy

Moniteur de ressources

import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
    get_available_cpu, get_available_gpu, get_num_cpus_per_node
)

# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()

print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")

# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")

# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Copy

Nettoyage

Une fois que vous avez terminé avec le cluster, vous pouvez le réduire pour éviter des frais supplémentaires. Utilisez le code suivant pour le réduire :

# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")
Copy