Snowflake Multi-Node ML Jobs

Snowflake Multi-Node ML Jobs를 사용하면 여러 컴퓨팅 노드에 걸친 Snowflake ML 컨테이너 내에서 분산 머신 러닝(ML) 워크플로를 실행할 수 있습니다. 여러 노드에 작업을 분산하여 대규모 데이터 세트와 복잡한 모델을 개선된 성능으로 처리합니다. Snowflake ML Jobs에 대한 자세한 내용은 Snowflake ML 작업 섹션을 참조하세요.

Snowflake Multi-Node ML Jobs는 여러 노드에 걸쳐 분산 실행을 활성화함으로써 Snowflake ML Job 기능을 확장합니다. 이를 통해 얻을 수 있는 이점은 다음과 같습니다.

노드가 여러 개 있는 Snowflake ML Job을 실행하면 다음과 같은 일이 발생합니다.

  • 한 노드가 헤드 노드(조정자) 역할을 함

  • 추가 노드가 워커 노드(컴퓨팅 리소스) 역할을 함

  • 노드가 함께 Snowflake에서 단일 논리 ML 작업 엔터티를 형성함

단일 노드 ML 작업에는 헤드 노드만 있습니다. 활성 노드가 세 개 있는 멀티 노드 작업에는 헤드 노드 한 개와 워커 노드 두 개가 있습니다. 세 노드 모두 워크로드를 실행하는 데 참여합니다.

전제 조건

Snowflake Multi-Node ML Jobs를 사용하려면 다음과 같은 전제 조건이 필요합니다.

중요

Snowflake Multi-Node ML Jobs는 현재 Python 3.10 클라이언트만 지원합니다. 다른 Python 버전에 대한 지원이 필요한 경우 Snowflake 계정 팀에 문의하세요.

멀티 노드 작업을 설정하려면 다음을 수행하세요.

  1. Python 3.10 환경에 Snowflake ML Python 패키지를 설치합니다.

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. 멀티 노드 작업을 지원할 만큼 노드가 충분히 있는 컴퓨팅 풀을 만듭니다.

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = 1
      MAX_NODES = <NUM_INSTANCES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy

    중요

    MAX_NODES는 학습 작업을 실행하는 데 사용할 대상 인스턴스 수보다 크거나 같게 설정해야 합니다. 학습 작업에 사용하려는 노드 수보다 많은 노드를 요청하면 실패하거나 예기치 않게 동작할 수 있습니다. 학습 작업을 실행하는 방법에 대한 자세한 내용은 멀티 노드 ML 작업 실행하기 섹션을 참조하세요.

멀티 노드 작업용 코드 작성하기

멀티 노드 작업 시 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_ 또는 `Ray<https://www.ray.io/>`_를 사용하여 분산 처리용 코드를 설계해야 합니다.

분산 모델링 클래스 또는 Ray를 사용할 때의 주요 패턴 및 고려 사항은 다음과 같습니다.

노드 초기화 및 가용성 이해하기

멀티 노드 작업에서 워커 노드는 비동기적으로 다른 시간에 초기화할 수 있습니다.

  • 특히 컴퓨팅 풀 리소스가 제한된 경우 노드가 모두 동시에 시작되지 않을 수 있습니다.

  • 일부 노드는 몇 초 또는 몇 분 후에도 시작될 수 있습니다.

  • ML Jobs는 지정된 target_instances`를 사용할 있을 때까지 자동으로 기다렸다가 페이로드를 실행합니다. 예상 노드를 제한 시간 내에 사용할 없는 경우 오류가 발생하며 작업에 실패합니다. 동작을 사용자 지정하는 방법에 대한 자세한 내용은 :ref:`label-snowflake_ml_jobs_advanced_config 섹션을 참조하세요.

Ray를 통해 작업에서 사용 가능한 노드를 확인할 수 있습니다.

import ray
ray.init(address="auto", ignore_reinit_error=True)  # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Copy

분산 처리 패턴

분산 처리 시 멀티 노드 작업의 페이로드 본문에 여러 패턴을 적용할 수 있습니다. 이러한 패턴은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_와 `Ray<https://www.ray.io/>`_를 활용합니다.

Snowflake의 분산 학습 API 사용하기

Snowflake는 공통 ML 프레임워크에 최적화된 트레이너를 제공합니다.

# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()

# Create distributed estimator
estimator = XGBEstimator(
    n_estimators=100,
    params={"objective": "reg:squarederror"},
    scaling_config=scaling_config
)

# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Copy

사용 가능한 APIs에 대한 자세한 내용은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_를 참조하세요.

네이티브 Ray 태스크 사용하기

또 다른 접근 방식은 Ray의 태스크 기반 프로그래밍 모델을 사용하는 것입니다.

# Inside the ML Job payload body
import ray

@ray.remote
def process_chunk(data_chunk):
    # Process a chunk of data
    return processed_result

# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Copy

자세한 내용은 `Ray의 태스크 프로그래밍 설명서<https://docs.ray.io/en/latest/ray-core/tasks.html>`_를 참조하세요.

멀티 노드 ML 작업 실행하기

target_instances 매개 변수를 사용하여 단일 노드 작업과 동일한 메서드로 멀티 노드 ML 작업을 실행할 수 있습니다.

원격 데코레이터 사용하기

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=3  # Specify the number of nodes
)
def distributed_training(data_table: str):

    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    # Configure scaling for distributed execution
    scaling_config = XGBScalingConfig()

    # Create distributed estimator
    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using distributed resources
    # NOTE: data_connector and feature_cols excluded for brevity
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")


job = distributed_training("<my_training_data>")
Copy

Python 파일 실행하기

from snowflake.ml.jobs import submit_file

job = submit_file(
    "<script_path>",
    "MY_COMPUTE_POOL",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

디렉터리 실행하기

from snowflake.ml.jobs import submit_directory

job = submit_directory(
    "<script_directory>",
    "MY_COMPUTE_POOL",
    entrypoint="<script_name>",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

고급 구성: min_instances 사용하기

더욱 유연하게 리소스를 관리하기 위해 min_instances 매개 변수(선택 사항)를 사용하여 작업을 진행하는 데 필요한 최소 인스턴스 수를 지정할 수 있습니다. :code:`min_instances`가 설정된 경우, 최소 노드 수를 사용할 수 있게 되면 그 수가 :code:`target_instances`보다 적더라도 작업 페이로드가 즉시 실행됩니다.

이는 다음을 수행하려는 경우에 유용합니다.

  • 전체 대상을 즉시 사용할 수 없어서 더 적은 수의 노드로 학습을 시작하려는 경우

  • 컴퓨팅 풀 리소스가 제한될 때 대기 시간을 줄이려는 경우

  • 다양한 리소스 가용성에 적응할 수 있는 내결함성 워크플로를 구현하려는 경우

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=5,  # Prefer 5 nodes
    min_instances=3      # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
    import ray

    # Check how many nodes we actually got
    available_nodes = len(ray.nodes())
    print(f"Training with {available_nodes} nodes")

    # Adapt your training logic based on available resources
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    scaling_config = XGBScalingConfig(
        num_workers=available_nodes
    )

    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using available distributed resources
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")

job = flexible_distributed_training("<my_training_data>")
Copy

멀티 노드 작업 관리하기

작업 상태 모니터링하기

작업 상태 모니터링은 단일 노드 작업과 동일합니다.

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # PENDING, RUNNING, FAILED, DONE

# Wait for completion
job.wait()
Copy

노드별로 로그에 액세스하기

멀티 노드 작업에서는 특정 인스턴스의 로그에 액세스할 수 있습니다.

# Get logs from the default (head) instance
logs_default = job.get_logs()

# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)

# Display logs in the notebook/console
job.show_logs()  # Default (head) instance logs
job.show_logs(instance_id=0)  # Instance 0 logs (not necessarily the head node)
Copy

일반적인 문제 및 제한 사항

다음 정보를 참고하여 발생할 수 있는 일반적인 문제를 해결하세요.

  • 노드 연결 실패: 워커 노드를 헤드 노드에 연결하지 못한 경우, 헤드 노드가 작업을 완료한 후 워커가 작업을 완료하기 전에 자체적으로 종료될 수 있습니다. 연결 실패를 방지하려면 작업에서 결과 컬렉션 논리를 구현하세요.

  • 메모리 소진: 메모리 문제로 인해 작업에 실패하는 경우 노드 크기를 늘리거나, 노드당 데이터가 적은 노드를 더 많이 사용하세요.

  • 노드 가용성 시간 제한: 필요한 인스턴스의 수(target_instances 또는 min_instances)를 미리 정의된 시간 제한 내에 사용할 수 없는 경우 작업에 실패합니다. 컴퓨팅 풀의 용량이 충분한지 확인하거나 인스턴스 요구 사항을 조정하세요.