Scale an application using Ray¶
The Snowflake container runtime integrates with Ray, an open-source unified framework for scaling AI and Python applications. This integration allows you to use Ray’s distributed computing capabilities on Snowflake for your machine learning workloads.
Ray is pre-installed and runs as a background process within the Snowflake ML container runtime. You can access Ray from the Container Runtime for ML in the following ways:
Snowflake Notebooks: An interactive environment where you can connect to Ray, define tasks, and scale your cluster dynamically for development and experimentation.
Snowflake ML Jobs: Submit your Ray applications as structured, repeatable jobs. You can specify the cluster size as part of the job configuration for production workloads.
When you run the container runtime within a Snowflake Notebook or ML Job, the Ray process is automatically initiated as part of that container.
Use the following Python code to connect to the cluster:
import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Important
Make sure you always use the "auto"
address when you’re connecting to the Ray cluster.
Initializing with the "auto"
address directs your application to the head node of the Ray cluster that Snowflake has provisioned for your session.
Scaling your Ray cluster¶
After you connect to the Ray cluster, you can adjust its size to meet the computational demands of your workload.
Use the following approaches to scale your Ray cluster:
Within a notebook, you can dynamically scale your cluster up or down using the scale_cluster
function. This is ideal for interactive workflows where resource needs might change.
When you specify expected_cluster_size=5
, you get 1 head node and 4 worker nodes.
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")
# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
For ML Jobs, you define the cluster size declaratively within your job definition. Specifying the cluster size in the job definition ensures that the required number of nodes is provisioned when the job starts.
For example, your job decorator might include:
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5 # Specify the number of nodes
)
def distributed_ray():
import ray
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
job = distributed_ray()
After you’ve finished using your cluster you can scale it down. For more information, see Cleaning up.
Monitoring with the Ray Dashboard¶
If you’re running a job from a Snowflake Notebook, you can use the Ray Dashboard to monitor your cluster. The dashboard is a web interface that allows you to view the cluster’s resources, jobs, tasks, and performance. Use the following code to get the dashboard URL:
from snowflake.ml.runtime_cluster import get_ray_dashboard_url
# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Open the URL in a new browser tab, log in with your Snowflake credentials.
Advanced use cases¶
This section covers advanced Ray features for complex workloads and for migrating existing applications.
Creating and operating distributed workloads with Ray¶
Ray provides components that enable you to create and operate distributed workloads. These include foundational components via Ray Core with essential primitives for building and scaling these workloads.
It also includes the following libraries that enable you build your own workflows for data preprocessing, ML training, hyperparameter tuning, and model inference:
Ray Data: Scalable data processing and transformation
Ray Train: Distributed training and fine-tuning of ML models
Ray Tune: Hyperparameter optimization with advanced search algorithms
Ray Serve: Model serving and inference
The following sections describe how you can use these libraries directly, while native Snowflake interfaces built over Ray provide additional tools to build, deploy, and operationalize Ray-based applications.
Ray Core: Tasks and Actors¶
Ray provides the following distributed computing primitives:
Tasks: Stateless functions that run remotely and return values
Actors: Stateful classes that can be instantiated remotely and called multiple times
Objects: Immutable values stored in Ray’s distributed object store
Resources: CPU, GPU, and custom resource requirements for tasks and actors
The following example demonstrates how to use a basic Ray Task and Actors to do linear regression:
import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)
# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])
# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
"""CPU-intensive computation example"""
# Simulate heavy computation (matrix operations)
result = np.dot(data, data.T)
return np.mean(result)
# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
def __init__(self):
# Load a simple model
self.model = LinearRegression()
# Train on dummy data
X_dummy = np.random.randn(100, 5)
y_dummy = np.random.randn(100)
self.model.fit(X_dummy, y_dummy)
def process_batch(self, batch):
# Convert to numpy if it's a DataFrame
if isinstance(batch, pd.DataFrame):
batch_array = batch.values
else:
batch_array = batch
return self.model.predict(batch_array)
# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future) # Blocks until task completes
print(f"Task result: {result}")
# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Ray Train: Distributed Training¶
Ray Train is a library that enables distributed training and fine-tuning of models. You can run your training code on a single machine or an entire cluster. For Ray on Snowflake, you can use Ray Train for single-node execution, but not multi-node execution.
For distributed multi-node training, use the Optimized Training functions in the container runtime. These functions provide integrated XGBoost, LightGBM, and PyTorch distributed training with automatic storage handling that internally uses the same Ray cluster.
Ray Data: Scalable Data Processing¶
Ray Data provides scalable, distributed data processing for ML workloads. It can handle datasets larger than cluster memory through streaming execution and lazy evaluation.
Note
Snowflake offers a native integration to transform any Snowflake data source to Ray Data. For more information, see the Data Connector and Ray Data Ingestion pages.
Use Ray Data for:
Processing large datasets that don’t fit in single-node memory
Distributed data preprocessing and feature engineering
Building data pipelines that integrate with other Ray libraries
import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)
# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15
# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
base_features[:, 0] * base_features[:, 1], # interaction
np.sin(base_features[:, 2]), # non-linear
base_features[:, 3] ** 2, # polynomial
np.random.randn(n_samples, n_features - 8) # additional random features
])
X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)
sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y
print(f"Created dataset with {n_samples} samples and {n_features} features")
# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)
# Transform data with Ray Data operations
def preprocess_batch(batch):
"""Preprocess a batch of data"""
# Get all feature columns
feature_cols = [col for col in batch.columns if col.startswith('feature_')]
# Normalize numerical features (first 3 for demo)
for col in feature_cols[:3]:
if col in batch.columns:
batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()
# Add derived features using actual column names
if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
batch['feature_0_squared'] = batch['feature_0'] ** 2
batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']
return batch
# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
preprocess_batch,
batch_format="pandas"
)
# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)
# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas() # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")
# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
batch_count += 1
print(f"Batch {batch_count}: {batch.shape}")
if batch_count >= 3: # Just show first 3 batches
break
print(f"Total batches processed: {batch_count}")
Ray Tune: Distributed Hyperparameter Tuning¶
Ray Tune provides distributed hyperparameter optimization with advanced search algorithms and early stopping capabilities. For a more integrated and optimized experience when reading from Snowflake data sources, use the native Hyperparameter Optimization (HPO) API. For more information about using HPO optimization, see Optimize a model’s hyperparameters.
If you’re looking for a more customizable approach to a distributed HPO implementation, use Ray Tune.
You can use Ray Tune for the following use cases:
Hyperparameter optimization across multiple trials in parallel
Advanced search algorithms (Bayesian optimization, population-based training)
Large-scale hyperparameter sweeps requiring distributed execution
import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)
# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10
X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)
# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
def train_function(config):
"""Training function that gets hyperparameters from Ray Tune"""
# Train model with current hyperparameters
model = RandomForestClassifier(
n_estimators=config["n_estimators"],
max_depth=config["max_depth"],
min_samples_split=config["min_samples_split"],
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Evaluate and report results
val_predictions = model.predict(X_val)
accuracy = accuracy_score(y_val, val_predictions)
# Report metrics back to Ray Tune
return {"accuracy": accuracy}
# Define search space
search_space = {
"n_estimators": tune.randint(50, 200),
"max_depth": tune.randint(3, 15),
"min_samples_split": tune.randint(2, 10)
}
# Configure and run hyperparameter optimization
tuner = tune.Tuner(
tune.with_resources(
train_function,
resources={"CPU": 2}
),
param_space=search_space,
tune_config=tune.TuneConfig(
metric="accuracy",
mode="max",
num_samples=20, # Number of trials
max_concurrent_trials=4
)
)
print("Starting hyperparameter optimization...")
results = tuner.fit()
# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f" Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f" Best parameters: {best_result.config}")
# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
print(f" {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Model Serving¶
For model serving, you can use Snowflake’s native capabilities. For more information, see Model Serving in Snowpark Container Services.
Submit and manage distributed applications on Ray clusters¶
Use Ray Jobs to submit and manage distributed applications on Ray clusters with better resource isolation and lifecycle management. For all job-based executions that require access to a Ray Cluster, Snowflake recommends using an ML Job, where you can define the Ray application logic. For instances where you require direct access to the Ray Job interface, such as migrating an existing implementation, you could use the Ray Job primitive as is described in the Ray documentation.
Use Ray jobs for:
Production ML pipelines and scheduled workflows
Long-running workloads requiring fault tolerance
Batch processing and large-scale data processing
import ray
from ray.job_submission import JobSubmissionClient
import os
# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)
# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"
client = JobSubmissionClient(dashboard_address)
# Simple job script
job_script = '''
import ray
@ray.remote
def compute_task(x):
return x * x
# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''
# Submit job
job_id = client.submit_job(
entrypoint=f"python -c '{job_script}'",
runtime_env={"pip": ["numpy"]},
submission_id="my-ray-job"
)
print(f"Submitted job: {job_id}")
# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Scaling Ray Clusters with Options¶
From a Snowflake Notebook, you can scale your Ray clusters to precisely match computational demands. A cluster consists of a head node (coordinator) and worker nodes (for task execution).
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Asynchronous scaling - returns immediately
scale_cluster(
expected_cluster_size=2,
is_async=True # Don't wait for all nodes to be ready
)
# Scaling with custom options
scale_cluster(
expected_cluster_size=3,
options={
"rollback_after_seconds": 300, # Auto-rollback after 5 minutes
"block_until_min_cluster_size": 2 # Return when at least 2 nodes ready
}
)
# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Resource monitoring¶
import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
get_available_cpu, get_available_gpu, get_num_cpus_per_node
)
# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()
print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")
# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")
# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Cleaning up¶
After you’re finished with the cluster, you can scale it down to avoid additional charges. Use the following code to scale it down:
# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")