Rayを使用してアプリケーションを拡張する

Snowflakeコンテナランタイムは、AIおよびPythonアプリケーションをスケーリングするためのオープンソースの統一フレームワークである Ray と統合しています。 この統合により、機械学習ワークロードに関してSnowflake上でRayの分散コンピューティング機能を使用することができます。

Rayはプリインストールされており、Snowflake MLコンテナランタイム内のバックグラウンドプロセスとして実行されます。Container Runtime for MLのRayには、以下の方法でアクセスできます。

Snowflake Notebooks :Rayに接続し、タスクを定義し、クラスターを動的にスケーリングできるインタラクティブな環境で、開発や実験を行うことができます。

Snowflake ML Jobs :構造化された反復可能なジョブとして、Rayのアプリケーションを送信します。実稼働ワークロードのジョブ構成の一部として、クラスターサイズを指定できます。

Snowflake NotebookまたはML Job内でコンテナランタイムを実行すると、そのコンテナの一部としてRayプロセスが自動的に開始されます。

以下のPythonコードを使用してクラスターに接続します。

import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Copy

重要

Rayクラスターに接続する際は、必ず "auto" アドレスを使用してください。"auto" アドレスで初期化すると、Snowflakeがセッション用にプロビジョニングしたRayクラスターのヘッドノードにアプリケーションが誘導されます。

Rayクラスターのスケーリング

Rayクラスターに接続した後、ワークロードの計算需要に合わせてクラスターのサイズを調整することができます。

Rayクラスターをスケールするには、以下のアプローチを使用します。

ノートブック内では、 scale_cluster 関数を使用してクラスターを動的にスケールアップまたはスケールダウンできます。これは、リソースのニーズが変化する可能性のあるインタラクティブなワークフローに最適です。

expected_cluster_size=5 を指定すると、1つのヘッドノードと4つのワーカーノードになります。

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")

# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Copy

クラスターを使い終わったら、スケールダウンすることができます。詳細については、 クリーンアップ をご参照ください。

Ray Dashboardによるモニタリング

Snowflake Notebookからジョブを実行している場合、Ray Dashboardを使用してクラスターをモニタリングできます。ダッシュボードは、クラスターのリソース、ジョブ、タスク、およびパフォーマンスを表示できるウェブインターフェースです。ダッシュボードURLを取得するには、次のコードを使用してください。

from snowflake.ml.runtime_cluster import get_ray_dashboard_url

# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Copy

新しいブラウザのタブでURLを開き、Snowflakeの認証情報でログインします。

高度な使用例

このセクションでは、複雑なワークロードや既存のアプリケーションを移行するための高度なRayの機能について説明します。

Rayによる分散ワークロードの作成と運用

Rayは、分散ワークロードの作成と運用を可能にするコンポーネントを提供します。これらには、ワークロードの構築とスケーリングに不可欠なプリミティブを備えたRay Coreによる基礎コンポーネントが含まれます。

また、データ前処理、MLトレーニング、ハイパーパラメーターチューニング、モデル推論のための独自のワークフローを構築できる以下のライブラリも含まれています。

  • Ray Data :スケーラブルなデータ処理と変換

  • Ray Train :MLモデルの分散学習と微調整

  • Ray Tune :高度な探索アルゴリズムによるハイパーパラメーター最適化

  • Ray Serve :モデルサービングと推論

以下のセクションでは、これらのライブラリを直接使用する方法について説明します。一方、Ray上に構築されたSnowflakeネイティブインターフェースは、Rayベースのアプリケーションを構築、デプロイ、運用するための追加ツールを提供します。

Ray Core:タスクとアクター

Rayは以下の分散コンピューティングプリミティブを提供します。

  • タスク :リモートで実行され値を返すステートレス関数

  • アクター :リモートでインスタンス化し、複数回呼び出すことができるステートフルなクラス

  • オブジェクト :Rayの分散オブジェクトストアに保存された不変の値

  • リソース :タスクおよびアクター用のCPU、GPU、カスタムリソース要件

次の例は、基本的なRayタスクとアクターを使って線形回帰を行う方法を示しています。

import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)

# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])

# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
    """CPU-intensive computation example"""
    # Simulate heavy computation (matrix operations)
    result = np.dot(data, data.T)
    return np.mean(result)

# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
    def __init__(self):
        # Load a simple model
        self.model = LinearRegression()
        # Train on dummy data
        X_dummy = np.random.randn(100, 5)
        y_dummy = np.random.randn(100)
        self.model.fit(X_dummy, y_dummy)

    def process_batch(self, batch):
        # Convert to numpy if it's a DataFrame
        if isinstance(batch, pd.DataFrame):
            batch_array = batch.values
        else:
            batch_array = batch
        return self.model.predict(batch_array)

# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future)  # Blocks until task completes
print(f"Task result: {result}")

# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Copy

Ray Train:分散トレーニング

Ray Trainはモデルの分散トレーニングと微調整を可能にするライブラリです。トレーニングコードは、1台のマシンでもクラスター全体でも実行できます。Snowflake上のRayの場合、シングルノード実行にはRay Trainを使用できますが、マルチノード実行には使用できません。

分散された複数ノードのトレーニングには、コンテナランタイムの最適化トレーニング機能を使用します。これらの機能は、内部的に同じRayクラスターを使用する自動ストレージ処理を備えた、統合されたXGBoost、LightGBM、およびPyTorch分散トレーニングを提供します。

Ray Data:スケーラブルなデータ処理

Ray Dataは、MLワークロードのためのスケーラブルな分散データ処理を提供します。ストリーミング実行と遅延評価によって、クラスターメモリより大きなデータセットを扱うことができます。

注釈

Snowflakeは、あらゆるSnowflakeデータソースをRay Dataに変換するネイティブ統合を提供します。詳細については、Data ConnectorとRay Data Ingestionのページを参照してください。

Ray Dataは以下に使用します。

  • シングルノードメモリに収まらない大規模データセットの処理

  • 分散データ前処理と特徴量エンジニアリング

  • 他のRayライブラリと統合するデータパイプラインの構築

import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)

# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15

# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
    base_features[:, 0] * base_features[:, 1],  # interaction
    np.sin(base_features[:, 2]),  # non-linear
    base_features[:, 3] ** 2,  # polynomial
    np.random.randn(n_samples, n_features - 8)  # additional random features
])

X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)

sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y

print(f"Created dataset with {n_samples} samples and {n_features} features")

# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)

# Transform data with Ray Data operations
def preprocess_batch(batch):
    """Preprocess a batch of data"""
    # Get all feature columns
    feature_cols = [col for col in batch.columns if col.startswith('feature_')]

    # Normalize numerical features (first 3 for demo)
    for col in feature_cols[:3]:
        if col in batch.columns:
            batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()

    # Add derived features using actual column names
    if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
        batch['feature_0_squared'] = batch['feature_0'] ** 2
        batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']

    return batch

# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
    preprocess_batch,
    batch_format="pandas"
)

# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)

# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas()  # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")

# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
    batch_count += 1
    print(f"Batch {batch_count}: {batch.shape}")
    if batch_count >= 3:  # Just show first 3 batches
        break

print(f"Total batches processed: {batch_count}")
Copy

Ray Tune:分散ハイパーパラメーターチューニング

Ray Tuneは、高度な探索アルゴリズムと早期停止機能を備えた分散ハイパーパラメーター最適化を提供します。Snowflakeデータソースからの読み取りをより統合的かつ最適化するには、ネイティブのハイパーパラメーター最適化(HPO)APIを使用します。HPO最適化の詳細については、 モデルのハイパーパラメーターの最適化 をご参照ください。

分散 HPO の実装にもっとカスタマイズ可能なアプローチをお探しなら、Ray Tuneをお使いください。

Ray Tuneは以下のような用途に使用できます。

  • 複数のトライアルを並行して行うハイパーパラメーターの最適化

  • 高度な探索アルゴリズム(ベイズ最適化、集団ベースのトレーニング)

  • 分散実行を必要とする大規模ハイパーパラメーターのスイープ

import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)

# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10

X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

def train_function(config):
    """Training function that gets hyperparameters from Ray Tune"""
    # Train model with current hyperparameters
    model = RandomForestClassifier(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"],
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate and report results
    val_predictions = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_predictions)

    # Report metrics back to Ray Tune
    return {"accuracy": accuracy}

# Define search space
search_space = {
    "n_estimators": tune.randint(50, 200),
    "max_depth": tune.randint(3, 15),
    "min_samples_split": tune.randint(2, 10)
}

# Configure and run hyperparameter optimization
tuner = tune.Tuner(
    tune.with_resources(
        train_function,
        resources={"CPU": 2}
    ),
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="accuracy",
        mode="max",
        num_samples=20,  # Number of trials
        max_concurrent_trials=4
    )
)

print("Starting hyperparameter optimization...")
results = tuner.fit()

# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f"   Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f"   Best parameters: {best_result.config}")

# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
    print(f"  {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Copy

使用中のモデル

モデルサービングには、Snowflakeのネイティブ機能を使用できます。詳細については、 Snowpark Container Servicesのモデルサービス をご参照ください。

Rayクラスター上の分散アプリケーションの送信と管理

Rayジョブを使って、Rayクラスター上に分散アプリケーションを投入・管理し、リソースの分離とライフサイクル管理を改善します。Rayクラスターへのアクセスが必要なジョブベースの実行には、SnowflakeではMLジョブの使用を推奨しています。それにより、Rayのアプリケーションロジックを定義できます。既存の実装を移行する場合など、Rayジョブインターフェースへの直接アクセスが必要な場合には、 Rayドキュメント に記載されているRayジョブプリミティブを使用することができます。

以下の目的でRayジョブを使用します。

  • 実稼働MLパイプラインとスケジュールされたワークフロー

  • フォールトトレランスを必要とする長時間稼働のワークロード

  • バッチ処理と大規模データ処理

import ray
from ray.job_submission import JobSubmissionClient
import os

# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)

# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"

client = JobSubmissionClient(dashboard_address)

# Simple job script
job_script = '''
import ray

@ray.remote
def compute_task(x):
    return x * x

# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''

# Submit job
job_id = client.submit_job(
    entrypoint=f"python -c '{job_script}'",
    runtime_env={"pip": ["numpy"]},
    submission_id="my-ray-job"
)

print(f"Submitted job: {job_id}")

# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Copy

オプションによるRayクラスターのスケーリング

Snowflakeノートブックから、計算需要に合わせてRayクラスターを正確にスケールすることができます。クラスターは、ヘッドノード(コーディネーター)とワーカーノード(タスク実行用)から構成されます。

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Asynchronous scaling - returns immediately
scale_cluster(
    expected_cluster_size=2,
    is_async=True  # Don't wait for all nodes to be ready
)

# Scaling with custom options
scale_cluster(
    expected_cluster_size=3,
    options={
        "rollback_after_seconds": 300,  # Auto-rollback after 5 minutes
        "block_until_min_cluster_size": 2  # Return when at least 2 nodes ready
    }
)

# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Copy

リソースモニター

import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
    get_available_cpu, get_available_gpu, get_num_cpus_per_node
)

# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()

print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")

# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")

# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Copy

クリーンアップ

クラスターを使い終わった後は、追加料金が発生しないようにスケールダウンすることができます。スケールダウンするには以下のコードを使用します。

# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")
Copy