Snowflake Multi-Node ML Jobs

Snowflake Multi-Node ML Jobs를 사용하면 여러 컴퓨팅 노드에 걸친 Snowflake ML 컨테이너 내에서 분산 머신 러닝(ML) 워크플로를 실행할 수 있습니다. 여러 노드에 작업을 분산하여 대규모 데이터 세트와 복잡한 모델을 개선된 성능으로 처리합니다. Snowflake ML Jobs에 대한 자세한 내용은 Snowflake ML 작업 섹션을 참조하세요.

Snowflake Multi-Node ML Jobs는 여러 노드에 걸쳐 분산 실행을 활성화함으로써 Snowflake ML Job 기능을 확장합니다. 이를 통해 얻을 수 있는 이점은 다음과 같습니다.

노드가 여러 개 있는 Snowflake ML Job을 실행하면 다음과 같은 일이 발생합니다.

  • 한 노드가 헤드 노드(조정자) 역할을 함

  • 추가 노드가 워커 노드(컴퓨팅 리소스) 역할을 함

  • 노드가 함께 Snowflake에서 단일 논리 ML 작업 엔터티를 형성함

단일 노드 ML 작업에는 헤드 노드만 있습니다. 활성 노드가 세 개 있는 멀티 노드 작업에는 헤드 노드 한 개와 워커 노드 두 개가 있습니다. 세 노드 모두 워크로드를 실행하는 데 참여합니다.

전제 조건

Snowflake Multi-Node ML Jobs를 사용하려면 다음과 같은 전제 조건이 필요합니다.

멀티 노드 작업을 설정하려면 다음을 수행하세요.

  1. Install the Snowflake ML Python package.

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. 멀티 노드 작업을 지원할 만큼 노드가 충분히 있는 컴퓨팅 풀을 만듭니다.

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = 1
      MAX_NODES = <NUM_INSTANCES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy

    중요

    MAX_NODES는 학습 작업을 실행하는 데 사용할 대상 인스턴스 수보다 크거나 같게 설정해야 합니다. 학습 작업에 사용하려는 노드 수보다 많은 노드를 요청하면 실패하거나 예기치 않게 동작할 수 있습니다. 학습 작업을 실행하는 방법에 대한 자세한 내용은 멀티 노드 ML 작업 실행하기 섹션을 참조하세요.

멀티 노드 작업용 코드 작성하기

멀티 노드 작업 시 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_ 또는 `Ray<https://www.ray.io/>`_를 사용하여 분산 처리용 코드를 설계해야 합니다.

분산 모델링 클래스 또는 Ray를 사용할 때의 주요 패턴 및 고려 사항은 다음과 같습니다.

노드 초기화 및 가용성 이해하기

멀티 노드 작업에서 워커 노드는 비동기적으로 다른 시간에 초기화할 수 있습니다.

  • 특히 컴퓨팅 풀 리소스가 제한된 경우 노드가 모두 동시에 시작되지 않을 수 있습니다.

  • 일부 노드는 몇 초 또는 몇 분 후에도 시작될 수 있습니다.

  • ML Jobs는 지정된 target_instances`를 사용할 있을 때까지 자동으로 기다렸다가 페이로드를 실행합니다. 예상 노드를 제한 시간 내에 사용할 없는 경우 오류가 발생하며 작업에 실패합니다. 동작을 사용자 지정하는 방법에 대한 자세한 내용은 :ref:`label-snowflake_ml_jobs_advanced_config 섹션을 참조하세요.

Ray를 통해 작업에서 사용 가능한 노드를 확인할 수 있습니다.

import ray
ray.init(address="auto", ignore_reinit_error=True)  # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Copy

분산 처리 패턴

분산 처리 시 멀티 노드 작업의 페이로드 본문에 여러 패턴을 적용할 수 있습니다. 이러한 패턴은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_와 `Ray<https://www.ray.io/>`_를 활용합니다.

Snowflake의 분산 학습 API 사용하기

Snowflake는 공통 ML 프레임워크에 최적화된 트레이너를 제공합니다.

# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()

# Create distributed estimator
estimator = XGBEstimator(
    n_estimators=100,
    params={"objective": "reg:squarederror"},
    scaling_config=scaling_config
)

# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Copy

사용 가능한 APIs에 대한 자세한 내용은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_를 참조하세요.

네이티브 Ray 태스크 사용하기

또 다른 접근 방식은 Ray의 태스크 기반 프로그래밍 모델을 사용하는 것입니다.

# Inside the ML Job payload body
import ray

@ray.remote
def process_chunk(data_chunk):
    # Process a chunk of data
    return processed_result

# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Copy

자세한 내용은 `Ray의 태스크 프로그래밍 설명서<https://docs.ray.io/en/latest/ray-core/tasks.html>`_를 참조하세요.

멀티 노드 ML 작업 실행하기

target_instances 매개 변수를 사용하여 단일 노드 작업과 동일한 메서드로 멀티 노드 ML 작업을 실행할 수 있습니다.

원격 데코레이터 사용하기

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=3  # Specify the number of nodes
)
def distributed_training(data_table: str):

    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    # Configure scaling for distributed execution
    scaling_config = XGBScalingConfig()

    # Create distributed estimator
    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using distributed resources
    # NOTE: data_connector and feature_cols excluded for brevity
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")


job = distributed_training("<my_training_data>")
Copy

Python 파일 실행하기

from snowflake.ml.jobs import submit_file

job = submit_file(
    "<script_path>",
    "MY_COMPUTE_POOL",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

디렉터리 실행하기

from snowflake.ml.jobs import submit_directory

job = submit_directory(
    "<script_directory>",
    "MY_COMPUTE_POOL",
    entrypoint="<script_name>",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

고급 구성: min_instances 사용하기

더욱 유연하게 리소스를 관리하기 위해 min_instances 매개 변수(선택 사항)를 사용하여 작업을 진행하는 데 필요한 최소 인스턴스 수를 지정할 수 있습니다. :code:`min_instances`가 설정된 경우, 최소 노드 수를 사용할 수 있게 되면 그 수가 :code:`target_instances`보다 적더라도 작업 페이로드가 즉시 실행됩니다.

이는 다음을 수행하려는 경우에 유용합니다.

  • 전체 대상을 즉시 사용할 수 없어서 더 적은 수의 노드로 학습을 시작하려는 경우

  • 컴퓨팅 풀 리소스가 제한될 때 대기 시간을 줄이려는 경우

  • 다양한 리소스 가용성에 적응할 수 있는 내결함성 워크플로를 구현하려는 경우

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=5,  # Prefer 5 nodes
    min_instances=3      # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
    import ray

    # Check how many nodes we actually got
    available_nodes = len(ray.nodes())
    print(f"Training with {available_nodes} nodes")

    # Adapt your training logic based on available resources
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    scaling_config = XGBScalingConfig(
        num_workers=available_nodes
    )

    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using available distributed resources
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")

job = flexible_distributed_training("<my_training_data>")
Copy

멀티 노드 작업 관리하기

작업 상태 모니터링하기

작업 상태 모니터링은 단일 노드 작업과 동일합니다.

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # PENDING, RUNNING, FAILED, DONE

# Wait for completion
job.wait()
Copy

노드별로 로그에 액세스하기

멀티 노드 작업에서는 특정 인스턴스의 로그에 액세스할 수 있습니다.

# Get logs from the default (head) instance
logs_default = job.get_logs()

# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)

# Display logs in the notebook/console
job.show_logs()  # Default (head) instance logs
job.show_logs(instance_id=0)  # Instance 0 logs (not necessarily the head node)
Copy

일반적인 문제 및 제한 사항

다음 정보를 참고하여 발생할 수 있는 일반적인 문제를 해결하세요.

  • 노드 연결 실패: 워커 노드를 헤드 노드에 연결하지 못한 경우, 헤드 노드가 작업을 완료한 후 워커가 작업을 완료하기 전에 자체적으로 종료될 수 있습니다. 연결 실패를 방지하려면 작업에서 결과 컬렉션 논리를 구현하세요.

  • 메모리 소진: 메모리 문제로 인해 작업에 실패하는 경우 노드 크기를 늘리거나, 노드당 데이터가 적은 노드를 더 많이 사용하세요.

  • 노드 가용성 시간 제한: 필요한 인스턴스의 수(target_instances 또는 min_instances)를 미리 정의된 시간 제한 내에 사용할 수 없는 경우 작업에 실패합니다. 컴퓨팅 풀의 용량이 충분한지 확인하거나 인스턴스 요구 사항을 조정하세요.