The Snowflake container runtime integrates with Ray, an open-source unified framework for scaling AI and Python applications. This integration allows you to use Ray’s distributed computing capabilities on Snowflake for your machine learning workloads.
Ray is pre-installed and runs as a background process within the Snowflake ML container runtime. You can access Ray from the Container Runtime in the following ways:
Snowflake Notebooks: An interactive environment where you can connect to Ray, define tasks, and scale your cluster dynamically for development and experimentation.
Snowflake ML Jobs: Submit your Ray applications as structured, repeatable jobs. You can specify the cluster size as part of the job configuration for production workloads.
When you run the container runtime within a Snowflake Notebook or ML Job, the Ray process is automatically initiated as part of that container.
Use the following Python code to connect to the cluster:
import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Important
Make sure you always use the "auto" address when you’re connecting to the Ray cluster.
Initializing with the "auto" address directs your application to the head node of the Ray cluster that Snowflake has provisioned for your session.
After you connect to the Ray cluster, you can adjust its size to meet the computational demands of your workload.
Use the following approaches to scale your Ray cluster:
Within a notebook, you can dynamically scale your cluster up or down using the scale_cluster function. This is ideal for interactive workflows where resource needs might change.
When you specify expected_cluster_size=5, you get 1 head node and 4 worker nodes.
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Check current cluster sizeprint(f"Current cluster size: {len(get_nodes())} nodes")
# Scale up to 4 nodes (1 head + 3 workers)print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
For ML Jobs, you define the cluster size declaratively within your job definition. Specifying the cluster size in the job definition ensures that the required number of nodes is provisioned when the job starts.
For example, your job decorator might include:
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5# Specify the number of nodes)defdistributed_ray():
import ray
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
job = distributed_ray()
After you’ve finished using your cluster you can scale it down. For more information, see Cleaning up.
If you’re running a job from a Snowflake Notebook, you can use the Ray Dashboard to monitor your cluster. The dashboard is a web interface that allows you to view the cluster’s resources, jobs, tasks, and performance.
Use the following code to get the dashboard URL:
from snowflake.ml.runtime_cluster import get_ray_dashboard_url
# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Open the URL in a new browser tab, log in with your Snowflake credentials.
This section covers advanced Ray features for complex workloads and for migrating existing applications.
Creating and operating distributed workloads with Ray¶
Ray provides components that enable you to create and operate distributed workloads. These include foundational components via Ray Core with essential primitives for building and scaling these workloads.
It also includes the following libraries that enable you build your own workflows for data preprocessing, ML training, hyperparameter tuning, and model inference:
Ray Data: Scalable data processing and transformation
Ray Train: Distributed training and fine-tuning of ML models
Ray Tune: Hyperparameter optimization with advanced search algorithms
Ray Serve: Model serving and inference
The following sections describe how you can use these libraries directly, while native Snowflake interfaces built over Ray provide additional tools to build, deploy, and operationalize Ray-based applications.
Ray Train is a library that enables distributed training and fine-tuning of models. You can run your training code on a single machine or an entire cluster.
You can use Ray Train for both single-node and multi-node execution.
For multi-node training, you must handle the following:
Distributed storage for checkpoints (no shared filesystem across nodes)
Custom data loading
Manual resource configuration to coordinate between data ingestion and training resource usage
For a streamlined experience, use the Optimized Training functions for XGBoost, LightGBM, and PyTorch. On the same Ray cluster, these functions handle:
Snowflake stage-based checkpointing
Native Snowflake data ingestion
Built-in resource allocation for data ingestion and training
Ray Data provides scalable, distributed data processing for ML workloads. It can handle datasets larger than cluster memory through streaming execution and lazy evaluation.
Note
Snowflake offers a native integration to transform any Snowflake data source to Ray Data. For more information, see the Data Connector and Ray Data Ingestion pages.
Use Ray Data for:
Processing large datasets that don’t fit in single-node memory
Distributed data preprocessing and feature engineering
Building data pipelines that integrate with other Ray libraries
import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster
# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)
# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations# Scaling benefits Ray Data when:# - Processing datasets larger than single-node memory (>10GB)# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)
# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
base_features[:, 0] * base_features[:, 1], # interaction
np.sin(base_features[:, 2]), # non-linear
base_features[:, 3] ** 2, # polynomial
np.random.randn(n_samples, n_features - 8) # additional random features
])
X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)
sample_data = pd.DataFrame(X, columns=[f'feature_{i}'for i inrange(n_features)])
sample_data['target'] = y
print(f"Created dataset with {n_samples} samples and {n_features} features")
# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)
# Transform data with Ray Data operationsdefpreprocess_batch(batch):
"""Preprocess a batch of data"""# Get all feature columns
feature_cols = [col for col in batch.columns if col.startswith('feature_')]
# Normalize numerical features (first 3 for demo)for col in feature_cols[:3]:
if col in batch.columns:
batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()
# Add derived features using actual column namesif'feature_0'in batch.columns and'feature_1'in batch.columns:
batch['feature_0_squared'] = batch['feature_0'] ** 2
batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']
return batch
# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
preprocess_batch,
batch_format="pandas"
)
# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)
# Convert to different formats for downstream useprint("Converting to pandas...")
pandas_df = processed_dataset.to_pandas() # Collect to pandasprint(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")
# Iterate through batches for memory efficiencyprint("Processing batches...")
batch_count = 0for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
batch_count += 1print(f"Batch {batch_count}: {batch.shape}")
if batch_count >= 3: # Just show first 3 batchesbreakprint(f"Total batches processed: {batch_count}")
Ray Tune provides distributed hyperparameter optimization with advanced search algorithms and early stopping capabilities. For a more integrated and optimized experience when reading from Snowflake data sources, use the native Hyperparameter Optimization (HPO) API. For more information about using HPO optimization, see Optimize a model’s hyperparameters.
If you’re looking for a more customizable approach to a distributed HPO implementation, use Ray Tune.
You can use Ray Tune for the following use cases:
Hyperparameter optimization across multiple trials in parallel
Submit and manage distributed applications on Ray clusters¶
Use Ray Jobs to submit and manage distributed applications on Ray clusters with better resource isolation and lifecycle management. For all job-based executions that require access to a Ray Cluster, Snowflake recommends using an ML Job, where you can define the Ray application logic. For instances where you require direct access to the Ray Job interface, such as migrating an existing implementation, you could use the Ray Job primitive as is described in the Ray documentation.
Use Ray jobs for:
Production ML pipelines and scheduled workflows
Long-running workloads requiring fault tolerance
Batch processing and large-scale data processing
import ray
from ray.job_submission import JobSubmissionClient
import os
# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)
# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"
client = JobSubmissionClient(dashboard_address)
# Simple job script
job_script = '''
import ray
@ray.remote
def compute_task(x):
return x * x
# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''# Submit job
job_id = client.submit_job(
entrypoint=f"python -c '{job_script}'",
runtime_env={"pip": ["numpy"]},
submission_id="my-ray-job"
)
print(f"Submitted job: {job_id}")
# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
From a Snowflake Notebook, you can scale your Ray clusters to precisely match computational demands. A cluster consists of a head node (coordinator) and worker nodes (for task execution).
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes
# Asynchronous scaling - returns immediately
scale_cluster(
expected_cluster_size=2,
is_async=True# Don't wait for all nodes to be ready
)
# Scaling with custom options
scale_cluster(
expected_cluster_size=3,
options={
"rollback_after_seconds": 300, # Auto-rollback after 5 minutes"block_until_min_cluster_size": 2# Return when at least 2 nodes ready
}
)
# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
After you’re finished with the cluster, you can scale it down to avoid additional charges. Use the following code to scale it down:
# Scale down when finished to conserve resourcesprint("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")