Ray를 사용하여 애플리케이션 확장하기

Snowflake 컨테이너 런타임은 AI 및 Python 애플리케이션을 확장하기 위한 오픈 소스 통합 프레임워크인 `Ray<https://docs.ray.io/>`_와 통합됩니다. 이 통합을 통해 Snowflake에서 Ray의 분산 컴퓨팅 기능을 머신 러닝 워크로드에 사용할 수 있습니다.

Ray는 사전 설치되어 있으며 Snowflake ML 컨테이너 런타임 내에서 백그라운드 프로세스로 실행됩니다. Container Runtime for ML에서 다음과 같은 방법으로 Ray에 액세스할 수 있습니다.

Snowflake Notebooks: 개발 및 실험을 위해 Ray에 연결하고, 태스크를 정의하고, 클러스터 크기를 동적으로 조정할 수 있는 대화형 환경입니다.

Snowflake ML Jobs: Ray 애플리케이션을 반복 가능한 정형 작업으로 제출합니다. 프로덕션 워크로드에 대한 작업 구성의 일부로 클러스터 크기를 지정할 수 있습니다.

Snowflake Notebook 또는 ML Job 내에서 컨테이너 런타임을 실행하는 경우 Ray 프로세스는 해당 컨테이너의 일부로 자동으로 시작됩니다.

다음 Python 코드를 사용하여 클러스터에 연결합니다.

import ray
# Connect to the pre-existing Ray cluster within the Snowflake environment
ray.init(address="auto", ignore_reinit_error=True)
print(f"Ray cluster resources: {ray.cluster_resources()}")
Copy

중요

Ray 클러스터에 연결할 때 항상 "auto" 주소를 사용하세요. "auto" 주소로 초기화하면 Snowflake가 세션을 위해 프로비저닝한 Ray 클러스터의 헤드 노드로 애플리케이션이 연결됩니다.

Ray 클러스터 확장

Ray 클러스터에 연결한 후에는 워크로드의 컴퓨팅 요구 사항에 맞게 크기를 조정할 수 있습니다.

다음과 같은 접근 방식을 사용하여 Ray 클러스터를 확장합니다.

노트북 내에서 scale_cluster 함수를 사용하여 클러스터를 동적으로 확장하거나 축소할 수 있습니다. 이 방법은 리소스 요구 사항이 변경될 수 있는 대화형 워크플로에 이상적입니다.

:code:`expected_cluster_size=5`를 지정하면 1개의 헤드 노드와 4개의 워커 노드가 지정됩니다.

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Check current cluster size
print(f"Current cluster size: {len(get_nodes())} nodes")

# Scale up to 4 nodes (1 head + 3 workers)
print("Scaling up cluster...")
scale_cluster(expected_cluster_size=4)
print(f"New cluster size: {len(get_nodes())} nodes")
Copy

클러스터 사용을 마친 후에는 클러스터를 축소할 수 있습니다. 자세한 내용은 정리 섹션을 참조하십시오.

Ray 대시보드를 통한 모니터링

Snowflake Notebook에서 작업을 실행하는 경우 Ray 대시보드를 사용하여 클러스터를 모니터링할 수 있습니다. 대시보드는 클러스터의 리소스, 작업, 태스크 및 성능을 볼 수 있는 웹 인터페이스입니다. 다음 코드를 사용하여 대시보드 URL을 가져옵니다.

from snowflake.ml.runtime_cluster import get_ray_dashboard_url

# This function is available in Notebooks to retrieve the dashboard URL
dashboard_url = get_ray_dashboard_url()
print(f"Access the Ray Dashboard here: {dashboard_url}")
Copy

새 브라우저 탭에서 URL을 열고 Snowflake 자격 증명으로 로그인합니다.

고급 사용 사례

이 섹션에서는 복잡한 워크로드와 기존 애플리케이션의 마이그레이션을 위한 고급 Ray 기능을 다룹니다.

Ray를 사용한 분산 워크로드 생성 및 운영

Ray는 분산 워크로드를 생성하고 운영할 수 있는 구성 요소를 제공합니다. 여기에는 이러한 워크로드를 구축하고 확장하기 위한 필수 기본 요소와 함께 Ray Core를 통한 기본 구성 요소가 포함됩니다.

또한 데이터 전처리, ML 학습, 하이퍼 매개 변수 튜닝, 모델 추론을 위한 자체 워크플로를 구축할 수 있는 다음과 같은 라이브러리도 포함되어 있습니다.

  • Ray Data: 확장 가능한 데이터 처리 및 변환

  • Ray Train: ML 모델의 분산 학습 및 미세 조정

  • Ray Tune: 고급 검색 알고리즘을 사용한 하이퍼 매개 변수 최적화

  • Ray Serve: 모델 제공 및 추론

Ray를 통해 구축된 네이티브 Snowflake 인터페이스는 Ray 기반 애플리케이션을 구축, 배포 및 운영하기 위한 추가 도구를 제공하지만, 다음 섹션에서는 이러한 라이브러리를 직접 사용하는 방법을 설명합니다.

Ray Core: 태스크 및 액터

Ray는 다음과 같은 분산 컴퓨팅 기본 요소를 제공합니다.

  • 태스크: 원격으로 실행되고 값을 반환하는 상태 비저장 함수

  • 액터: 원격으로 인스턴스화하고 여러 번 호출할 수 있는 상태 저장 클래스

  • 오브젝트: Ray의 분산 오브젝트 저장소에 저장된 변경 불가능한 값

  • 리소스: CPU, GPU와 태스크 및 액터에 대한 사용자 지정 리소스 요구 사항

다음 예제에서는 기본 Ray 태스크 및 액터를 사용하여 선형 회귀를 수행하는 방법을 보여줍니다.

import ray
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

# Initialize Ray (automatically connects to cluster in Snowflake ML)
ray.init(address="auto", ignore_reinit_error=True)

# Create sample data
large_dataset = np.random.randn(1000, 10)
batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)])

# Ray Tasks - stateless remote functions
@ray.remote
def compute_heavy_task(data):
    """CPU-intensive computation example"""
    # Simulate heavy computation (matrix operations)
    result = np.dot(data, data.T)
    return np.mean(result)

# Ray Actors - stateful remote classes
@ray.remote
class DataProcessor:
    def __init__(self):
        # Load a simple model
        self.model = LinearRegression()
        # Train on dummy data
        X_dummy = np.random.randn(100, 5)
        y_dummy = np.random.randn(100)
        self.model.fit(X_dummy, y_dummy)

    def process_batch(self, batch):
        # Convert to numpy if it's a DataFrame
        if isinstance(batch, pd.DataFrame):
            batch_array = batch.values
        else:
            batch_array = batch
        return self.model.predict(batch_array)

# Submit tasks and get object references
future = compute_heavy_task.remote(large_dataset)
result = ray.get(future)  # Blocks until task completes
print(f"Task result: {result}")

# Create and use actors
processor = DataProcessor.remote()
batch_result = ray.get(processor.process_batch.remote(batch_data))
print(f"Batch processing result shape: {batch_result.shape}")
Copy

Ray Train: 분산 학습

Ray Train은 모델의 분산 학습 및 미세 조정을 지원하는 라이브러리입니다. 단일 머신 또는 전체 클러스터에서 학습 코드를 실행할 수 있습니다. Ray on Snowflake의 경우 단일 노드 실행에는 Ray Train을 사용할 수 있지만 다중 노드 실행에는 사용할 수 없습니다.

분산형 다중 노드 학습의 경우 컨테이너 런타임에서 최적화된 학습 함수를 사용합니다. 이러한 함수는 동일한 Ray 클러스터를 내부적으로 사용하는 자동 저장소 처리와 더불어 통합된 XGBoost, LightGBM 및 PyTorch 분산 학습을 제공합니다.

Ray Data: 확장 가능한 데이터 처리

Ray Data는 ML 워크로드를 위한 확장 가능한 분산형 데이터 처리를 제공합니다 스트리밍 실행 및 지연 평가를 통해 클러스터 메모리보다 큰 데이터 세트를 처리할 수 있습니다.

참고

Snowflake는 모든 Snowflake 데이터 소스를 Ray Data로 변환하는 네이티브 통합을 제공합니다. 자세한 내용은 데이터 커넥터 및 Ray Data 수집 페이지를 참조하세요.

Ray Data는 다음과 같은 용도로 사용하세요.

  • 단일 노드 메모리에 맞지 않는 대규모 데이터 세트 처리

  • 분산형 데이터 전처리 및 기능 엔지니어링

  • 다른 Ray 라이브러리와 통합되는 데이터 파이프라인 구축

import ray
import ray.data as rd
import pandas as pd
import numpy as np
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for better performance with large datasets or CPU-intensive operations
# Scaling benefits Ray Data when:
# - Processing datasets larger than single-node memory (>10GB)
# - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing)
# - Need faster processing through parallelization across multiple nodes
scale_cluster(expected_cluster_size=4)

# Create sample dataset
np.random.seed(42)
n_samples = 50000
n_features = 15

# Generate features with some correlation structure
base_features = np.random.randn(n_samples, 5)
derived_features = np.column_stack([
    base_features[:, 0] * base_features[:, 1],  # interaction
    np.sin(base_features[:, 2]),  # non-linear
    base_features[:, 3] ** 2,  # polynomial
    np.random.randn(n_samples, n_features - 8)  # additional random features
])

X = np.column_stack([base_features, derived_features])
y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int)

sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)])
sample_data['target'] = y

print(f"Created dataset with {n_samples} samples and {n_features} features")

# Create Ray Dataset from pandas DataFrame
ray_dataset = rd.from_pandas(sample_data)

# Transform data with Ray Data operations
def preprocess_batch(batch):
    """Preprocess a batch of data"""
    # Get all feature columns
    feature_cols = [col for col in batch.columns if col.startswith('feature_')]

    # Normalize numerical features (first 3 for demo)
    for col in feature_cols[:3]:
        if col in batch.columns:
            batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std()

    # Add derived features using actual column names
    if 'feature_0' in batch.columns and 'feature_1' in batch.columns:
        batch['feature_0_squared'] = batch['feature_0'] ** 2
        batch['feature_interaction'] = batch['feature_0'] * batch['feature_1']

    return batch

# Apply transformations lazily
processed_dataset = ray_dataset.map_batches(
    preprocess_batch,
    batch_format="pandas"
)

# Repartition for optimal performance across cluster nodes
processed_dataset = processed_dataset.repartition(num_blocks=8)

# Convert to different formats for downstream use
print("Converting to pandas...")
pandas_df = processed_dataset.to_pandas()  # Collect to pandas
print(f"Processed dataset shape: {pandas_df.shape}")
print(f"New columns: {list(pandas_df.columns)}")

# Iterate through batches for memory efficiency
print("Processing batches...")
batch_count = 0
for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"):
    batch_count += 1
    print(f"Batch {batch_count}: {batch.shape}")
    if batch_count >= 3:  # Just show first 3 batches
        break

print(f"Total batches processed: {batch_count}")
Copy

Ray Tune: 분산 하이퍼 매개 변수 튜닝

Ray Tune은 고급 검색 알고리즘 및 조기 중지 기능을 통해 분산 하이퍼 매개 변수 최적화를 제공합니다. Snowflake 데이터 소스에서 읽을 때 보다 잘 통합되고 최적화된 환경을 제공하기 위해 네이티브 HPO(하이퍼 매개 변수 최적화) API를 사용합니다. HPO 최적화 사용에 대한 자세한 내용은 :ref:`label-optimize-model-hyperparameters`를 참조하세요.

보다 쉽게 사용자 지정할 수 있는 분산 HPO 구현 접근 방식을 원하는 경우 Ray Tune을 사용하세요.

다음과 같은 사용 사례에 Ray Tune을 사용할 수 있습니다.

  • 여러 번의 시도에서 병렬로 하이퍼 매개 변수 최적화

  • 고급 검색 알고리즘(베이지안 최적화, 모집단 기반 학습)

  • 분산 실행이 필요한 대규모 하이퍼 매개 변수 스윕

import ray
from ray import tune
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from snowflake.ml.runtime_cluster import scale_cluster

# Initialize Ray
ray.init(address="auto", ignore_reinit_error=True)

# Optional: Scale cluster for hyperparameter tuning
# Scaling benefits Ray Tune when:
# - Running many trials in parallel
# - Each trial is computationally intensive
# - Need faster hyperparameter search
scale_cluster(expected_cluster_size=6)

# Create sample dataset
np.random.seed(42)
n_samples = 5000
n_features = 10

X = np.random.randn(n_samples, n_features)
y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int)

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

def train_function(config):
    """Training function that gets hyperparameters from Ray Tune"""
    # Train model with current hyperparameters
    model = RandomForestClassifier(
        n_estimators=config["n_estimators"],
        max_depth=config["max_depth"],
        min_samples_split=config["min_samples_split"],
        random_state=42,
        n_jobs=-1
    )

    model.fit(X_train, y_train)

    # Evaluate and report results
    val_predictions = model.predict(X_val)
    accuracy = accuracy_score(y_val, val_predictions)

    # Report metrics back to Ray Tune
    return {"accuracy": accuracy}

# Define search space
search_space = {
    "n_estimators": tune.randint(50, 200),
    "max_depth": tune.randint(3, 15),
    "min_samples_split": tune.randint(2, 10)
}

# Configure and run hyperparameter optimization
tuner = tune.Tuner(
    tune.with_resources(
        train_function,
        resources={"CPU": 2}
    ),
    param_space=search_space,
    tune_config=tune.TuneConfig(
        metric="accuracy",
        mode="max",
        num_samples=20,  # Number of trials
        max_concurrent_trials=4
    )
)

print("Starting hyperparameter optimization...")
results = tuner.fit()

# Get best results
best_result = results.get_best_result()
print(f"✅ Hyperparameter tuning completed!")
print(f"   Best accuracy: {best_result.metrics['accuracy']:.4f}")
print(f"   Best parameters: {best_result.config}")

# Show results summary
df_results = results.get_dataframe()
print(f"\nTop 5 results:")
top_results = df_results.nlargest(5, 'accuracy')
for i, (_, row) in enumerate(top_results.iterrows(), 1):
    print(f"  {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}")
Copy

모델 제공

모델 제공의 경우 Snowflake의 네이티브 기능을 사용할 수 있습니다. 자세한 내용은 Snowpark Container Services에서 모델 제공 섹션을 참조하십시오.

Ray 클러스터에서 분산 애플리케이션 제출 및 관리

Ray Job을 사용하여 더 나은 리소스 격리 및 수명 주기 관리를 통해 Ray 클러스터에서 분산 애플리케이션을 제출하고 관리합니다. Ray 클러스터에 액세스해야 하는 모든 작업 기반 실행의 경우 Snowflake는 Ray 애플리케이션 논리를 정의할 수 있는 ML Job을 사용할 것을 권장합니다. 기존 구현 마이그레이션과 같이 Ray Job 인터페이스에 직접 액세스해야 하는 경우 `Ray 설명서<https://docs.ray.io/en/latest/cluster/running-applications/job-submission/sdk.html>`_에 설명된 대로 Ray Job 기본 유형을 사용할 수 있습니다.

Ray 작업은 다음과 같은 용도로 사용하세요.

  • 프로덕션 ML 파이프라인 및 예약된 워크플로

  • 내결함성이 필요한 장기 실행 워크로드

  • 일괄 처리 및 대규모 데이터 처리

import ray
from ray.job_submission import JobSubmissionClient
import os

# Initialize Ray and get job client
ray.init(address="auto", ignore_reinit_error=True)

# Get Ray dashboard address for job submission
node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0")
dashboard_port = os.getenv("DASHBOARD_PORT", "9999")
dashboard_address = f"http://{node_ip}:{dashboard_port}"

client = JobSubmissionClient(dashboard_address)

# Simple job script
job_script = '''
import ray

@ray.remote
def compute_task(x):
    return x * x

# Submit tasks to Ray cluster
futures = [compute_task.remote(i) for i in range(5)]
results = ray.get(futures)
print(f"Results: {results}")
'''

# Submit job
job_id = client.submit_job(
    entrypoint=f"python -c '{job_script}'",
    runtime_env={"pip": ["numpy"]},
    submission_id="my-ray-job"
)

print(f"Submitted job: {job_id}")

# Monitor job status
status = client.get_job_status(job_id)
print(f"Job status: {status}")
Copy

옵션을 사용하여 Ray 클러스터 크기 조정

Snowflake Notebook에서 계산 요구 사항에 정확하게 일치하도록 Ray 클러스터의 크기를 조정할 수 있습니다. 클러스터는 헤드 노드(조정자)와 워커 노드(태스크 실행용)로 구성됩니다.

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Asynchronous scaling - returns immediately
scale_cluster(
    expected_cluster_size=2,
    is_async=True  # Don't wait for all nodes to be ready
)

# Scaling with custom options
scale_cluster(
    expected_cluster_size=3,
    options={
        "rollback_after_seconds": 300,  # Auto-rollback after 5 minutes
        "block_until_min_cluster_size": 2  # Return when at least 2 nodes ready
    }
)

# Scale down for cost efficiency
scale_cluster(expected_cluster_size=2)
Copy

리소스 모니터링

import ray
from snowflake.ml.runtime_cluster import get_nodes
from snowflake.ml.runtime_cluster.cluster_manager import (
    get_available_cpu, get_available_gpu, get_num_cpus_per_node
)

# Check available resources
available_cpus = get_available_cpu()
available_gpus = get_available_gpu()
cpus_per_node = get_num_cpus_per_node()

print(f"Available CPUs: {available_cpus}")
print(f"Available GPUs: {available_gpus}")
print(f"CPUs per node: {cpus_per_node}")

# Get Ray's view of resources
ray_resources = ray.available_resources()
print(f"Ray available resources: {ray_resources}")

# Calculate resource utilization
total_cpus = ray.cluster_resources().get('CPU', 0)
used_cpus = total_cpus - available_cpus
utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0
print(f"CPU Utilization: {utilization:.1f}%")
Copy

정리

클러스터 사용을 마친 후에는 클러스터를 축소하여 추가 요금이 부과되지 않도록 할 수 있습니다. 다음 코드를 사용하여 크기를 줄이세요.

# Scale down when finished to conserve resources
print("Scaling down cluster...")
scale_cluster(expected_cluster_size=1)
print(f"Final cluster size: {len(get_nodes())} nodes")
Copy