Snowflake mit ML-Jobs mit mehreren Knoten¶
Verwenden Sie Snowflake ML-Jobs mit mehreren Knoten zur Ausführung von verteilte Machine Learning (ML) Workflows innerhalb von Snowflake ML-Container-Laufzeiten auf mehreren Serverknoten. Verteilen Sie die Arbeit auf mehrere Knoten, um große Datensets und komplexe Modelle mit verbesserter Leistung zu verarbeiten. Weitere Informationen zu Snowflake ML-Jobs finden Sie unter Snowflake ML Jobs.
Snowflake ML-Jobs mit mehreren Knoten erweitern Snowflake ML-Aufgabenfunktionen durch verteilte Ausführung über mehrere Knoten. Dadurch profitieren Sie von folgenden Vorteilen:
Skalierbare Leistung: Horizontales Skalieren, um Datensets zu verarbeiten, die zu groß für einen einzelnen Knoten sind
Weniger Trainingszeit: Schnelleres Trainieren komplexer Modelle durch Parallelisierung
Ressourceneffizienz: Optimieren der Ressourcennutzung für datenintensive Workloads
Framework-Integration: Nutzen Sie verteilte Frameworks, wie Distributed Modeling Classes und Ray nahtlos.
Wenn Sie einen Snowflake ML-Job mit mehreren Knoten ausführen, geschieht Folgendes:
Ein Knoten dient als Hauptknoten (Koordinater)
Zusätzliche Knoten dienen als Worker-Knoten (Computeressourcen).
Zusammen bilden die Knoten eine einzige, logische ML Jobentität in Snowflake
Ein ML-Job mit einem Knoten hat nur einen Hauptknoten. Ein Job mit mehreren Knoten und drei aktiven Knoten hat einen Hauptknoten und zwei Worker-Knoten. Alle drei Knoten sind an der Ausführung Ihres Workloads beteiligt.
Voraussetzungen¶
Die folgenden Voraussetzungen sind erforderlich, um SnowflakeML-Jobs mit mehreren Knoten zu verwenden.
Wichtig
Snowflake ML-Jobs mit mehreren Knoten unterstützen derzeit nur Python 3.10-Clients. Wenden Sie sich an Ihren Snowflake-Kundenbetreuer, wenn Sie Unterstützung für andere Python-Versionen benötigen.
Um Jobs mit mehreren Knoten einzurichten, gehen Sie wie folgt vor:
Installieren Sie das Snowflake ML Python-Paket in Ihrer Python 3.10 Umgebung.
pip install snowflake-ml-python>=1.9.2
Erstellen Sie einen Computepool mit genügend Knoten, um Ihren Job mit mehreren Knoten zu unterstützen:
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = 1 MAX_NODES = <NUM_INSTANCES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Wichtig
Sie müssen einstellen, dass MAX_NODES größer oder gleich der Anzahl der Zielinstanzen ist, die Sie zur Ausführung Ihres Trainingsjobs verwenden. Wenn Sie mehr Knoten anfordern, als Sie für Ihren Trainingsjob verwenden möchten, kann dieser fehlschlagen oder sich unvorhersehbar verhalten. Informationen zum Ausführen eines Trainingsjobs finden Sie unter Ausführen von ML-Jobs mit mehreren Knoten.
Schreiben von Code für Jobs mit mehreren Knoten¶
Für Jobs mit mehreren Knoten muss Ihr Code für die verteilte Verarbeitung mit konzipiert sein Distributed Modeling Classes oder Ray.
Im Folgenden finden Sie die wichtigsten Muster und Hinweise zur Verwendung von Distributed Modeling Classes oder Ray:
Erläuterungen zur Initialisierung und Verfügbarkeit von Knoten¶
In Jobs mit mehreren Knoten können Worker-Knoten asynchron und zu unterschiedlichen Zeiten initialisiert werden:
Möglicherweise starten nicht alle Knoten gleichzeitig, insbesondere wenn die Ressourcen des Computepools begrenzt sind
Einige Knoten starten möglicherweise Sekunden oder sogar Minuten nach anderen
ML-Jobs warten automatisch darauf, dass die angegebenen
target_instances
vor der Ausführung Ihrer Nutzlast verfügbar sind. Der Job schlägt mit einem Fehler fehl, wenn die erwarteten Knoten nicht innerhalb des Timeout-Zeitraums verfügbar sind. Weitere Informationen zum Anpassen dieser Verhaltensweise finden Sie unter Erweiterte Konfiguration: Verwenden von min_instances.
Sie können die verfügbaren Knoten in Ihrem Job über Ray überprüfen:
import ray
ray.init(address="auto", ignore_reinit_error=True) # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Verteilte Verarbeitungsmuster¶
Es gibt mehrere Muster, die Sie im Nutzlasttext des Jobs mit mehreren Knoten für die verteilte Verarbeitung anwenden können. Diese Muster nutzen Distributed Modeling Classes und Ray:
Verwenden der verteilten Trainings-API von Snowflake¶
Snowflake bietet optimierte Trainingsläufe für gängige ML-Frameworks:
# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Weitere Informationen zu den verfügbaren APIs finden Sie unter Distributed Modeling Classes.
Verwenden von nativen Ray-Aufgaben¶
Ein anderer Ansatz ist die Verwendung des aufgabenbasierten Programmiermodells von Ray:
# Inside the ML Job payload body
import ray
@ray.remote
def process_chunk(data_chunk):
# Process a chunk of data
return processed_result
# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Weitere Informationen dazu finden Sie unter Ray-Dokumentation zur Aufgabenprogrammierung.
Ausführen von ML-Jobs mit mehreren Knoten¶
Sie könnenML-Jobs mit mehreren Knoten mit denselben Methoden ausführen wie Jobs mit einem Knoten, indem Sie den Parameter target_instances
verwenden:
Verwenden des Remote-Decorator-Elements¶
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=3 # Specify the number of nodes
)
def distributed_training(data_table: str):
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()
# Create distributed estimator
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = distributed_training("<my_training_data>")
Ausführen einer Python-Datei¶
from snowflake.ml.jobs import submit_file
job = submit_file(
"<script_path>",
"MY_COMPUTE_POOL",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
Ausführen eines Verzeichnisses¶
from snowflake.ml.jobs import submit_directory
job = submit_directory(
"<script_directory>",
"MY_COMPUTE_POOL",
entrypoint="<script_name>",
stage_name="<payload_stage>",
session=session,
target_instances=<num_training_nodes> # Specify the number of nodes
)
Erweiterte Konfiguration: Verwenden von min_instances¶
Für ein flexibleres Ressourcenmanagement können Sie optional den Parameter min_instances
verwenden, um eine Mindestanzahl von Instanzen anzugeben, die für die Fortsetzung des Jobs erforderlich sind. Wenn min_instances
festgelegt ist, wird die Nutzlast des Jobs ausgeführt, sobald die Mindestanzahl von Knoten verfügbar ist, auch wenn diese Anzahl kleiner als target_instances
ist.
Dies ist nützlich, wenn Sie Folgendes möchten:
Beginnen des Trainings mit weniger Knoten, wenn das vollständige Ziel nicht sofort verfügbar ist
Verkürzen von Wartezeiten, wenn die Ressourcen des Computepools begrenzt sind
Implementieren von fehlertoleranten Workflows, die sich an die unterschiedliche Verfügbarkeit von Ressourcen anpassen können
from snowflake.ml.jobs import remote
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
target_instances=5, # Prefer 5 nodes
min_instances=3 # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
import ray
# Check how many nodes we actually got
available_nodes = len(ray.nodes())
print(f"Training with {available_nodes} nodes")
# Adapt your training logic based on available resources
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
scaling_config = XGBScalingConfig(
num_workers=available_nodes
)
estimator = XGBEstimator(
n_estimators=100,
params={"objective": "reg:squarederror"},
scaling_config=scaling_config
)
# Train using available distributed resources
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
job = flexible_distributed_training("<my_training_data>")
Verwalten von Jobs mit mehreren Knoten¶
Überwachen des Jobstatus¶
Die Überwachung des Jobstatus entsprecht der für Jobs mit einem Knoten:
from snowflake.ml.jobs import MLJob, get_job, list_jobs
# List all jobs
jobs = list_jobs()
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}") # PENDING, RUNNING, FAILED, DONE
# Wait for completion
job.wait()
Zugriff auf Protokolle nach Knoten¶
In Jobs mit mehreren Knoten können Sie auf Protokolle bestimmter Instanzen zugreifen:
# Get logs from the default (head) instance
logs_default = job.get_logs()
# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)
# Display logs in the notebook/console
job.show_logs() # Default (head) instance logs
job.show_logs(instance_id=0) # Instance 0 logs (not necessarily the head node)
Bekannte Probleme und Beschränkungen¶
Verwenden Sie die folgenden Informationen, um häufige Probleme zu beheben, die auftreten können.
Knotenverbindungsfehler: Wenn Worker-Knoten keine Verbindung zum Hauptknoten herstellen, ist es möglich, dass der Hauptknoten seine Aufgabe beendet und sich dann selbst ausschaltet, bevor der Worker seine Arbeit beendet hat. Um Verbindungsfehler zu vermeiden, implementieren Sie eine Logik zur Ergebnissammlung im Job.
Arbeitsspeichererschöpfung: Wenn Jobs aufgrund von Arbeitsspeicherproblemen fehlschlagen, erhöhen Sie die Knotengröße oder verwenden Sie mehr Knoten mit weniger Daten pro Knoten.
Timeout der Knotenverfügbarkeit: Wenn die erforderliche Anzahl von Instanzen (entweder
target_instances
odermin_instances
) innerhalb des vordefinierten Timeouts nicht verfügbar ist, schlägt der Job fehl. Stellen Sie sicher, dass Ihr Computepool über ausreichende Kapazität verfügt, oder passen Sie die Anforderungen Ihrer Instanz an.