Skalieren einer Anwendung mit Ray¶
Die Snowflake Container-Laufzeit lässt sich in Ray integrieren, ein vereinheitlichtes Open-Source-Framework für die Skalierung von AI- und Python-Anwendungen. Mit dieser Integration können Sie die verteilten Verarbeitungsfunktionen von Ray auf Snowflake für Ihre Machine-Learning-Workloads nutzen.
Ray ist vorinstalliert und wird als Hintergrundprozess innerhalb der Snowflake ML-Container-Laufzeit ausgeführt. Sie können auf folgende Weise über die Container-Laufzeit für ML auf Ray zugreifen:
Snowflake Notebooks: Eine interaktive Umgebung, in der Sie eine Verbindung zu Ray herstellen, Aufgaben definieren und Ihren Cluster für Entwicklung und Experimente dynamisch skalieren können.
SnowflakeML-Jobs: Übermitteln Sie Ihre Ray-Anwendungen als strukturierte, wiederholbare Jobs. Sie können die Cluster-Größe als Teil der Jobkonfiguration für Produktions-Workloads angeben.
Wenn Sie die Container-Laufzeit innerhalb eines Snowflake Notebooks oder ML-Jobs ausführen, wird der Ray-Prozess automatisch als Teil dieses Containers initiiert.
Verwenden Sie den folgenden Python-Code, um eine Verbindung zum Cluster herzustellen:
import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Wichtig
Stellen Sie sicher, dass Sie immer die "auto"-Adresse verwenden, wenn Sie eine Verbindung zum Ray-Cluster herstellen. Wenn Sie zur Initialisierung die "auto"-Adresse verwenden, wird Ihre Anwendung an den Hauptknoten des Ray-Clusters weitergeleitet, den Snowflake für Ihre Sitzung bereitgestellt hat.
Skalieren Ihres Ray-Clusters¶
Nachdem Sie eine Verbindung zum Ray-Cluster hergestellt haben, können Sie dessen Größe an die Verarbeitungsanforderungen Ihres Workloads anpassen.
Verwenden Sie die folgenden Ansätze, um Ihren Ray-Cluster zu skalieren:
Innerhalb eines Notebooks können Sie Ihren Cluster mit der scale_cluster-Funktion dynamisch nach oben oder unten skalieren. Dies ist ideal für interaktive Workflows, bei denen sich die Ressourcenanforderungen ändern können.
Wenn Sie expected_cluster_size=5 angeben, erhalten Sie 1 Hauptknoten und 4 Worker-Knoten.
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")
# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Für ML-Jobs definieren Sie die Cluster-Größe deklarativ innerhalb Ihrer Jobdefinition. Durch die Angabe der Cluster-Größe in der Jobdefinition wird sichergestellt, dass beim Start des Jobs die erforderliche Anzahl von Knoten bereitgestellt wird.
Ihr Job-Decorator könnte z. B. Folgendes enthalten:
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5 # Specify the number of nodes
)
def distributed_ray():
import ray
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
job = distributed_ray()
Wenn Sie Ihren Cluster nicht mehr verwenden möchten, können Sie ihn nach unten skalieren. Weitere Informationen dazu finden Sie unter Bereinigen.
Überwachen mit dem Ray-Dashboard¶
Wenn Sie einen Job von einem Snowflake Notebook aus ausführen, können Sie Ihren Cluster mit dem Ray-Dashboard überwachen. Das Dashboard ist eine Weboberfläche, auf der Sie die Ressourcen, Jobs, Aufgaben und die Leistung des Clusters anzeigen können. Verwenden Sie den folgenden Code zum Abrufen der Dashboard-URL:
from snowflake.ml.runtime_cluster import get_ray_dashboard_url
# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Öffnen Sie die URL in einer neuen Browser-Registerkarte und melden Sie sich mit Ihren Snowflake-Anmeldeinformationen an.
Erweiterte Anwendungsfälle¶
In diesem Abschnitt werden erweiterte Ray-Features für komplexe Workloads und für die Migration bestehender Anwendungen behandelt.
Erstellen und Betreiben verteilter Workloads mit Ray¶
Ray bietet Komponenten, mit denen Sie verteilte Workloads erstellen und betreiben können. Dazu gehören grundlegende Komponenten über Ray Core mit wichtigen Primitiven für die Erstellung und die Skalierung dieser Workloads.
Außerdem sind die folgenden Bibliotheken enthalten, mit denen Sie Ihre eigenen Workflows für die Datenvorverarbeitung erstellen können: ML-Training, Hyperparameter-Abstimmung und Modellableitung:
Ray Data: Skalierbare Datenverarbeitung und -transformation
Ray Train: Verteiltes Training und Feinabstimmung von ML-Modellen
Ray Tune: Hyperparameter-Optimierung mit erweiterten Suchalgorithmen
Ray Serve: Modellbereitstellung und -ableitung
In den folgenden Abschnitten wird beschrieben, wie Sie diese Bibliotheken direkt verwenden können, während auf Ray basierende native Snowflake-Schnittstellen zusätzliche Tools zum Erstellen, Bereitstellen und Operationalisieren von Ray-basierten Anwendungen bieten.
Ray Core: Aufgaben und Akteure¶
Ray bietet die folgenden Primitive für verteiltes Computing:
Aufgaben: Zustandslose Funktionen, die extern ausgeführt werden und Werte zurückgeben
Akteure: Zustandsabhängige Klassen, die extern instanziiert und mehrfach aufgerufen werden können
Objekte: Unveränderliche Werte, die im verteilten Objektspeicher von Ray abgelegt sind
Ressourcen: CPU, GPUund kundenspezifische Ressourcenanforderungen für Aufgaben und Akteure
Das folgende Beispiel zeigt, wie Sie mit einfachen Ray-Aufgaben und -Akteuren eine lineare Regression durchführen:
import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)
# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])
# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
"""CPU-intensive computation example"""
# Simulate heavy computation (matrix operations)
result = np.dot(data, data.T)
return np.mean(result)
# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
def __init__(self):
# Load a simple model
self.model = LinearRegression()
# Train on dummy data
X_dummy = np.random.randn(100, 5)
y_dummy = np.random.randn(100)
self.model.fit(X_dummy, y_dummy)
def process_batch(self, batch):
# Convert to numpy if it's a DataFrame
if isinstance(batch, pd.DataFrame):
batch_array = batch.values
else:
batch_array = batch
return self.model.predict(batch_array)
# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future) # Blocks until task completes
print(f"Task result: {result}")
# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Ray Train: Verteiltes Training¶
Ray Train ist eine Bibliothek, die verteiltes Training und die Feinabstimmung von Modellen ermöglicht. Sie können Ihren Trainingscode auf einem einzelnen Rechner oder auf einem gesamten Cluster ausführen. Für Ray auf Snowflake können Sie Ray Train zur Ausführung auf einem einzelnen Knoten verwenden, aber nicht zur Ausführung auf mehreren Knoten.
Für verteiltes Training mit mehreren Knoten verwenden Sie die Funktionen für optimiertes Training in der Laufzeit des Containers. Diese Funktionen bieten integrierten XGBoost, LightGBM und verteiltes PyTorch-Training mit automatischer Speicherbehandlung, die intern denselben Ray-Cluster verwendet.
Ray Data: Skalierbare Datenverarbeitung¶
Ray Data bietet eine skalierbare, verteilte Datenverarbeitung für ML-Workloads. Durch Streaming-Ausführung und Auswertung im Lazy-Modus können Datensets, die größer als der Clusterspeicher sind, verarbeitet werden.
Bemerkung
Snowflake bietet eine native Integration, um jede Snowflake-Datenquelle in Ray Data umzuwandeln. Weitere Informationen dazu finden Sie auf den Seiten zum Datenkonnektor und zur Ray-Datenerfassung.
Verwenden Sie Ray Data für Folgendes:
Verarbeiten großer Datensets, die nicht in einen Einzelknoten-Speicher passen
Verteilte Vorverarbeitung von Daten und Feature-Engineering
Erstellen von Datenpipelines, die sich in andere Ray-Bibliotheken integrieren lassen
import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)
# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15
# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
base_features[:, 0] * base_features[:, 1], # interaction
np.sin(base_features[:, 2]), # non-linear
base_features[:, 3] ** 2, # polynomial
np.random.randn(n_samples, n_features - 8) # additional random features
])
X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)
sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y
print(f"Created dataset with {n_samples} samples and {n_features} features")
# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)
# Transform data with Ray Data operations
def preprocess_batch(batch):
"""Preprocess a batch of data"""
# Get all feature columns
feature_cols = [col for col in batch.columns if col.startswith('feature_')]
# Normalize numerical features (first 3 for demo)
for col in feature_cols[:3]:
if col in batch.columns:
batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()
# Add derived features using actual column names
if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
batch['feature_0_squared'] = batch['feature_0'] ** 2
batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']
return batch
# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
preprocess_batch,
batch_format="pandas"
)
# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)
# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas() # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")
# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
batch_count += 1
print(f"Batch {batch_count}: {batch.shape}")
if batch_count >= 3: # Just show first 3 batches
break
print(f"Total batches processed: {batch_count}")
Ray Tune: Verteilte Hyperparameter-Abstimmung¶
Ray Tune bietet eine verteilte Hyperparameter-Optimierung mit erweiterten Suchalgorithmen und Funktionen zum frühzeitigen Stoppen. Für eine besser integrierte und optimierte Erfahrung beim Lesen von Snowflake-Datenquellen verwenden Sie die native Hyperparameter-Optimierung (HPO)-API. Weitere Informationen zur Verwendung der HPO-Optimierung finden Sie unter Optimieren Sie die Hyperparameter eines Modells.
Wenn Sie nach einem anpassbaren Ansatz für eine verteilte HPO-Implementierung suchen, verwenden Sie Ray Tune.
Sie können Ray Tune für die folgenden Anwendungsfälle verwenden:
Parallele Hyperparameter-Optimierung über mehrere Testversionen hinweg
Erweiterte Suchalgorithmen (Bayes’sche Optimierung, populationsbasiertes Training)
Umfassende Hyperparameter-Sweeps, die eine verteilte Ausführung erfordern
import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)
# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10
X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)
# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
def train_function(config):
"""Training function that gets hyperparameters from Ray Tune"""
# Train model with current hyperparameters
model = RandomForestClassifier(
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
min_samples_split=config["min_samples_split"],
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate and report results
val_predictions = model.predict(X_val)
accuracy = accuracy_score(y_val, val_predictions)
# Report metrics back to Ray Tune
return {"accuracy": accuracy}
# Define search space
search_space = {
"n_estimators": tune.randint(50, 200),
"max_depth": tune.randint(3, 15),
"min_samples_split": tune.randint(2, 10)
}
# Configure and run hyperparameter optimization
tuner = tune.Tuner(
tune.with_resources(
train_function,
resources={"CPU": 2}
),
param_space=search_space,
tune_config=tune.TuneConfig(
metric="accuracy",
mode="max",
num_samples=20, # Number of trials
max_concurrent_trials=4
)
)
print("Starting hyperparameter optimization...")
results = tuner.fit()
# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f" Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f" Best parameters: {best_result.config}")
# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
print(f" {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Model Serving¶
Für die Bereitstellung von Modellen können Sie die nativen Funktionen von Snowflake nutzen. Weitere Informationen dazu finden Sie unter Model Serving in Snowpark Container Services.
Senden und Verwalten verteilter Anwendungen auf Ray-Clustern¶
Verwenden Sie Ray Jobs, um verteilte Anwendungen auf Ray-Clustern mit besserer Ressourcenisolierung und Lebenszyklusverwaltung zu übermitteln und zu verwalten. Für alle jobbasierten Ausführungen, die Zugriff auf einen Ray-Cluster erfordern, empfiehlt Snowflake die Verwendung eines ML-Jobs, bei dem Sie die Ray-Anwendungslogik definieren können. Für Fälle, in denen Sie direkten Zugriff auf die Ray Job-Schnittstelle benötigen, wie z. B. für die Migration einer bestehenden Implementierung, können Sie das Ray Job-Primitiv verwenden, wie in der Ray-Dokumentation beschrieben.
Verwenden Sie Ray-Jobs für:
ML-Produktions-Pipelines und geplante Workflows
Zeitintensive Workloads, die Fehlertoleranz erfordern
Batchverarbeitung und Verarbeitung großer Datenmengen
import ray
from ray.job_submission import JobSubmissionClient
import os
# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)
# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"
client = JobSubmissionClient(dashboard_address)
# Simple job script
job_script = '''
import ray
@ray.remote
def compute_task(x):
return x * x
# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''
# Submit job
job_id = client.submit_job(
entrypoint=f"python -c '{job_script}'",
runtime_env={"pip": ["numpy"]},
submission_id="my-ray-job"
)
print(f"Submitted job: {job_id}")
# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Skalieren von Ray-Clustern mit Optionen¶
Über ein Snowflake Notebook können Sie Ihre Ray-Cluster so skalieren, dass sie den Verarbeitungsanforderungen genau entsprechen. Ein Cluster besteht aus einem Hauptknoten (Koordinator) und Worker-Knoten (für die Aufgabeausführung).
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Asynchronous scaling - returns immediately
scale_cluster(
expected_cluster_size=2,
is_async=True # Don't wait for all nodes to be ready
)
# Scaling with custom options
scale_cluster(
expected_cluster_size=3,
options={
"rollback_after_seconds": 300, # Auto-rollback after 5 minutes
"block_until_min_cluster_size": 2 # Return when at least 2 nodes ready
}
)
# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Ressourcenüberwachung¶
import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
get_available_cpu, get_available_gpu, get_num_cpus_per_node
)
# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()
print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")
# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")
# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Bereinigen¶
Wenn Sie mit dem Cluster fertig sind, können Sie ihn nach unten skalieren, um zusätzliche Kosten zu vermeiden. Verwenden Sie zum Herunterskalieren den folgenden Code:
# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")