Snowflake Multi-Node ML Jobs¶
Snowflake Multi-Node ML Jobs를 사용하면 여러 컴퓨팅 노드에 걸친 Snowflake ML 컨테이너 내에서 분산 머신 러닝(ML) 워크플로를 실행할 수 있습니다. 여러 노드에 작업을 분산하여 대규모 데이터 세트와 복잡한 모델을 개선된 성능으로 처리합니다. Snowflake ML Jobs에 대한 자세한 내용은 Snowflake ML 작업 섹션을 참조하세요.
Snowflake Multi-Node ML Jobs는 여러 노드에 걸쳐 분산 실행을 활성화함으로써 Snowflake ML Job 기능을 확장합니다. 이를 통해 얻을 수 있는 이점은 다음과 같습니다.
확장 가능한 성능: 너무 커서 단일 노드에 맞지 않는 데이터 세트를 처리할 수 있도록 수평으로 확장
학습 시간 단축: 병렬화를 통해 복잡한 모델 학습 속도 향상
리소스 효율성: 데이터 집약적인 워크로드에 맞게 리소스 사용률 최적화
프레임워크 통합: `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_와 `Ray <https://www.ray.io/>`_같은 분산 프레임워크를 원활하게 사용
노드가 여러 개 있는 Snowflake ML Job을 실행하면 다음과 같은 일이 발생합니다.
한 노드가 헤드 노드(조정자) 역할을 함
추가 노드가 워커 노드(컴퓨팅 리소스) 역할을 함
노드가 함께 Snowflake에서 단일 논리 ML 작업 엔터티를 형성함
단일 노드 ML 작업에는 헤드 노드만 있습니다. 활성 노드가 세 개 있는 멀티 노드 작업에는 헤드 노드 한 개와 워커 노드 두 개가 있습니다. 세 노드 모두 워크로드를 실행하는 데 참여합니다.
전제 조건¶
Snowflake Multi-Node ML Jobs를 사용하려면 다음과 같은 전제 조건이 필요합니다.
멀티 노드 작업을 설정하려면 다음을 수행하세요.
Install the Snowflake ML Python package.
pip install snowflake-ml-python>=1.9.2
멀티 노드 작업을 지원할 만큼 노드가 충분히 있는 컴퓨팅 풀을 만듭니다.
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = 1 MAX_NODES = <NUM_INSTANCES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
중요
MAX_NODES는 학습 작업을 실행하는 데 사용할 대상 인스턴스 수보다 크거나 같게 설정해야 합니다. 학습 작업에 사용하려는 노드 수보다 많은 노드를 요청하면 실패하거나 예기치 않게 동작할 수 있습니다. 학습 작업을 실행하는 방법에 대한 자세한 내용은 멀티 노드 ML 작업 실행하기 섹션을 참조하세요.
멀티 노드 작업용 코드 작성하기¶
멀티 노드 작업 시 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_ 또는 `Ray<https://www.ray.io/>`_를 사용하여 분산 처리용 코드를 설계해야 합니다.
분산 모델링 클래스 또는 Ray를 사용할 때의 주요 패턴 및 고려 사항은 다음과 같습니다.
노드 초기화 및 가용성 이해하기¶
멀티 노드 작업에서 워커 노드는 비동기적으로 다른 시간에 초기화할 수 있습니다.
특히 컴퓨팅 풀 리소스가 제한된 경우 노드가 모두 동시에 시작되지 않을 수 있습니다.
일부 노드는 몇 초 또는 몇 분 후에도 시작될 수 있습니다.
ML Jobs는 지정된
target_instances`를 사용할 수 있을 때까지 자동으로 기다렸다가 페이로드를 실행합니다. 예상 노드를 제한 시간 내에 사용할 수 없는 경우 오류가 발생하며 작업에 실패합니다. 이 동작을 사용자 지정하는 방법에 대한 자세한 내용은 :ref:`label-snowflake_ml_jobs_advanced_config섹션을 참조하세요.
Ray를 통해 작업에서 사용 가능한 노드를 확인할 수 있습니다.
import ray
ray.init(address="auto", ignore_reinit_error=True) # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
분산 처리 패턴¶
분산 처리 시 멀티 노드 작업의 페이로드 본문에 여러 패턴을 적용할 수 있습니다. 이러한 패턴은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_와 `Ray<https://www.ray.io/>`_를 활용합니다.
Snowflake의 분산 학습 API 사용하기¶
Snowflake는 공통 ML 프레임워크에 최적화된 트레이너를 제공합니다.
# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
사용 가능한 APIs에 대한 자세한 내용은 `분산 모델링 클래스<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors>`_를 참조하세요.
네이티브 Ray 태스크 사용하기¶
또 다른 접근 방식은 Ray의 태스크 기반 프로그래밍 모델을 사용하는 것입니다.
# Inside the ML Job payload body
import ray
@ray.remote
def process_chunk(data_chunk):
# Process a chunk of data
return processed_result
# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
자세한 내용은 `Ray의 태스크 프로그래밍 설명서<https://docs.ray.io/en/latest/ray-core/tasks.html>`_를 참조하세요.
멀티 노드 ML 작업 실행하기¶
target_instances 매개 변수를 사용하여 단일 노드 작업과 동일한 메서드로 멀티 노드 ML 작업을 실행할 수 있습니다.
원격 데코레이터 사용하기¶
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=3 # Specify the number of nodes
)
def distributed_training(data_table: str):
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = distributed_training("<my_training_data>")
Python 파일 실행하기¶
from snowflake.ml.jobs import submit_file
job = submit_file(
"<script_path>",
"MY_COMPUTE_POOL",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
디렉터리 실행하기¶
from snowflake.ml.jobs import submit_directory
job = submit_directory(
"<script_directory>",
"MY_COMPUTE_POOL",
entrypoint="<script_name>",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
고급 구성: min_instances 사용하기¶
더욱 유연하게 리소스를 관리하기 위해 min_instances 매개 변수(선택 사항)를 사용하여 작업을 진행하는 데 필요한 최소 인스턴스 수를 지정할 수 있습니다. :code:`min_instances`가 설정된 경우, 최소 노드 수를 사용할 수 있게 되면 그 수가 :code:`target_instances`보다 적더라도 작업 페이로드가 즉시 실행됩니다.
이는 다음을 수행하려는 경우에 유용합니다.
전체 대상을 즉시 사용할 수 없어서 더 적은 수의 노드로 학습을 시작하려는 경우
컴퓨팅 풀 리소스가 제한될 때 대기 시간을 줄이려는 경우
다양한 리소스 가용성에 적응할 수 있는 내결함성 워크플로를 구현하려는 경우
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5, # Prefer 5 nodes
min_instances=3 # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
import ray
# Check how many nodes we actually got
available_nodes = len(ray.nodes())
print(f"Training with {available_nodes} nodes")
# Adapt your training logic based on available resources
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
scaling_config = XGBScalingConfig(
num_workers=available_nodes
)
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using available distributed resources
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = flexible_distributed_training("<my_training_data>")
멀티 노드 작업 관리하기¶
작업 상태 모니터링하기¶
작업 상태 모니터링은 단일 노드 작업과 동일합니다.
from snowflake.ml.jobs import MLJob, get_job, list_jobs
# List all jobs
jobs = list_jobs()
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}") # PENDING, RUNNING, FAILED, DONE
# Wait for completion
job.wait()
노드별로 로그에 액세스하기¶
멀티 노드 작업에서는 특정 인스턴스의 로그에 액세스할 수 있습니다.
# Get logs from the default (head) instance
logs_default = job.get_logs()
# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)
# Display logs in the notebook/console
job.show_logs() # Default (head) instance logs
job.show_logs(instance_id=0) # Instance 0 logs (not necessarily the head node)
일반적인 문제 및 제한 사항¶
다음 정보를 참고하여 발생할 수 있는 일반적인 문제를 해결하세요.
노드 연결 실패: 워커 노드를 헤드 노드에 연결하지 못한 경우, 헤드 노드가 작업을 완료한 후 워커가 작업을 완료하기 전에 자체적으로 종료될 수 있습니다. 연결 실패를 방지하려면 작업에서 결과 컬렉션 논리를 구현하세요.
메모리 소진: 메모리 문제로 인해 작업에 실패하는 경우 노드 크기를 늘리거나, 노드당 데이터가 적은 노드를 더 많이 사용하세요.
노드 가용성 시간 제한: 필요한 인스턴스의 수(
target_instances또는min_instances)를 미리 정의된 시간 제한 내에 사용할 수 없는 경우 작업에 실패합니다. 컴퓨팅 풀의 용량이 충분한지 확인하거나 인스턴스 요구 사항을 조정하세요.