Tâches multi-nœuds Snowflake ML

Utilisez les tâches multi-nœuds Snowflake ML pour exécuter le flux de travail de machine learning (ML) distribué dans Snowflake ML Container Runtimes sur plusieurs nœuds de calcul. Répartissez le travail entre plusieurs nœuds pour traiter de grands ensembles de données et des modèles complexes avec de meilleures performances. Pour des informations sur les tâches SnowflakeML, consultez Tâches Snowflake ML.

Les tâches multi-nœuds Snowflake ML étendent les capacités de Snowflake ML en permettant une exécution distribuée sur plusieurs nœuds. Vous bénéficiez ainsi de :

  • Performances évolutives : évoluez horizontalement pour traiter des ensembles de données trop volumineux pour tenir sur un seul nœud

  • Temps d’entraînement réduit : accélérez l’entraînement de modèles complexes grâce à la parallélisation

  • Efficacité des ressources : optimisez l’utilisation des ressources pour les charges de travail à forte intensité de données

  • Intégration du cadre : utilisez de manière transparente des cadres distribués comme les classes de modélisation distribuées et Ray.

Lorsque vous exécutez une tâche Snowflake ML multi- nœuds, ce qui suit se produit :

  • Un nœud sert de nœud principal (coordinateur).

  • Les nœuds supplémentaires servent de nœuds de travail (ressources de calcul).

  • Ensemble, les nœuds forment une seule entité de tâche ML logique dans Snowflake

Une tâche ML d’un seul nœud n’a qu’un nœud principal. Une tâche de plusieurs nœuds avec trois nœuds actifs a un nœud principal et deux nœuds de travail. Les trois nœuds participent à l’exécution de votre charge de travail.

Conditions préalables

Les conditions préalables suivantes sont nécessaires pour utiliser les tâches multi-nœuds Snowflake ML

Important

Les tâches multi-nœuds Snowflake ML ne prennent actuellement en charge que les clients Python 3.10. Contactez votre équipe de compte Snowflake si vous avez besoin d’assistance pour d’autres versions de Python.

Pour configurer des tâches multi-nœuds, procédez comme suit :

  1. Installez le paquet Python Snowflake ML dans votre environnement Python 3.10.

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. Créez un pool de calcul avec suffisamment de nœuds pour prendre en charge votre tâche multi-nœuds :

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = 1
      MAX_NODES = <NUM_INSTANCES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy

    Important

    Vous devez définir MAX_NODESsur un nombre supérieur ou égal au nombre d’instances cibles que vous utilisez pour exécuter votre tâche d’entraînement. Si vous demandez plus de nœuds que ce que vous avez l’intention d’utiliser pour votre tâche de formation, il existera un risque d’échec ou de comportement imprévisible. Pour plus d’informations sur l’exécution d’une tâche d’entraînement, consultez Exécution des tâches multi-nœuds ML.

Écriture de code pour les tâches multi-nœuds

Pour les tâches multi-nœuds, votre code doit être conçu pour un traitement distribué à l’aide de classes de modélisation distribuées ou Ray.

Vous trouverez ci-dessous des modèles clés et des considérations à prendre en compte lorsque vous utilisez des classes de modélisation distribuées ou Ray :

Compréhension de l’initialisation et de la disponibilité des nœuds

Dans les tâches multi-nœuds, les nœuds de travail peuvent s’initialiser de manière asynchrone et à différents moments :

  • Les nœuds peuvent ne pas tous démarrer simultanément, en particulier si les ressources de pool de calcul sont limitées.

  • Certains nœuds peuvent démarrer plusieurs secondes, voire minutes, après d’autres.

  • Les tâches ML attendent automatiquement la disponibilité des:code:target_instances spécifiées avant d’exécuter votre charge utile. La tâche échoue et renvoie une erreur si les nœuds prévus ne sont pas disponibles dans le délai d’inactivité imparti. Pour plus d’informations sur la personnalisation de ce comportement, consultez Configuration avancée : Utilisation du paramètre min_instances.

Vous pouvez vérifier les nœuds disponibles dans votre tâche via Ray :

import ray
ray.init(address="auto", ignore_reinit_error=True)  # Ray is automatically initialized in multi-node jobs
nodes_info = ray.nodes()
print(f"Available nodes: {len(nodes_info)}")
Copy

Modèles de traitement distribué

Il existe plusieurs modèles que vous pouvez appliquer dans le corps de la charge utile de la tâche multi-nœuds pour le traitement distribué. Ces modèles tirent parti des classes de modélisation distribuées et de Ray:

Utilisation de l’API d’entraînement distribué de Snowflake

Snowflake fournit des entraînements optimisés pour les cadres ML courants :

# Inside the ML Job payload body
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

# Configure scaling for distributed execution
scaling_config = XGBScalingConfig()

# Create distributed estimator
estimator = XGBEstimator(
    n_estimators=100,
    params={"objective": "reg:squarederror"},
    scaling_config=scaling_config
)

# Train using distributed resources
# NOTE: data_connector and feature_cols excluded for brevity
model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")
Copy

Pour plus d’informations sur les APIs disponibles, consultez les classes de modélisation distribuées .

Utilisation de tâches natives de Ray

Une autre approche consiste à utiliser le modèle de programmation basé sur les tâches de Ray :

# Inside the ML Job payload body
import ray

@ray.remote
def process_chunk(data_chunk):
    # Process a chunk of data
    return processed_result

# Distribute work across available workers
data_chunks = split_data(large_dataset)
futures = [process_chunk.remote(chunk) for chunk in data_chunks]
results = ray.get(futures)
Copy

Pour plus d’informations, consultez la documentation sur la programmation des tâches de Ray.

Exécution des tâches multi-nœuds ML

Vous pouvez exécuter des multi-nœuds. ML qui utilisent les mêmes méthodes que les tâches à un seul nœud, en utilisant le paramètre target_instances :

Utilisation du décorateur distant

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=3  # Specify the number of nodes
)
def distributed_training(data_table: str):

    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    # Configure scaling for distributed execution
    scaling_config = XGBScalingConfig()

    # Create distributed estimator
    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using distributed resources
    # NOTE: data_connector and feature_cols excluded for brevity
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")


job = distributed_training("<my_training_data>")
Copy

Exécution d’un fichier Python

from snowflake.ml.jobs import submit_file

job = submit_file(
    "<script_path>",
    "MY_COMPUTE_POOL",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

Exécution d’un répertoire

from snowflake.ml.jobs import submit_directory

job = submit_directory(
    "<script_directory>",
    "MY_COMPUTE_POOL",
    entrypoint="<script_name>",
    stage_name="<payload_stage>",
    session=session,
    target_instances=<num_training_nodes>  # Specify the number of nodes
)
Copy

Configuration avancée : Utilisation du paramètre min_instances

Pour une gestion des ressources plus flexible, vous pouvez utiliser le paramètre facultatif min_instances pour spécifier un nombre minimal d’instances requises pour que la tâche puisse continuer. Si min_instances est défini, la charge utile de la tâche est exécutée dès que le nombre minimal de nœuds est disponible, même si ce nombre est inférieur à target_instances.

Ceci est utile lorsque vous souhaitez :

  • Commencer l’entraînement avec moins de nœuds si la cible complète n’est pas immédiatement disponible

  • Réduire les temps d’attente lorsque les ressources du pool de calcul sont limitées

  • Mettre en œuvre des flux de travail tolérants aux pannes qui peuvent s’adapter à la disponibilité variable des ressources

from snowflake.ml.jobs import remote

@remote(
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
    target_instances=5,  # Prefer 5 nodes
    min_instances=3      # But start with at least 3 nodes
)
def flexible_distributed_training(data_table: str):
    import ray

    # Check how many nodes we actually got
    available_nodes = len(ray.nodes())
    print(f"Training with {available_nodes} nodes")

    # Adapt your training logic based on available resources
    from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig

    scaling_config = XGBScalingConfig(
        num_workers=available_nodes
    )

    estimator = XGBEstimator(
        n_estimators=100,
        params={"objective": "reg:squarederror"},
        scaling_config=scaling_config
    )

    # Train using available distributed resources
    model = estimator.fit(data_connector, input_cols=feature_cols, label_col="target")

job = flexible_distributed_training("<my_training_data>")
Copy

Gestion des tâches multi-nœuds

Surveillance de l’état des tâches

La surveillance de l’état des tâches est inchangée par rapport aux tâches à un seul nœud :

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Basic job information
print(f"Job ID: {job.id}")
print(f"Status: {job.status}")  # PENDING, RUNNING, FAILED, DONE

# Wait for completion
job.wait()
Copy

Accès aux journaux par nœud

Dans les tâches multi-nœuds, vous pouvez accéder aux journaux d’instances spécifiques :

# Get logs from the default (head) instance
logs_default = job.get_logs()

# Get logs from specific instances by ID
logs_instance0 = job.get_logs(instance_id=0)
logs_instance1 = job.get_logs(instance_id=1)
logs_instance2 = job.get_logs(instance_id=2)

# Display logs in the notebook/console
job.show_logs()  # Default (head) instance logs
job.show_logs(instance_id=0)  # Instance 0 logs (not necessarily the head node)
Copy

Problèmes courants et limitations

Utilisez les informations suivantes pour résoudre les problèmes courants que vous pourriez rencontrer.

  • Échecs de connexion des nœuds : Si les nœuds de travail ne parviennent pas à se connecter au nœud principal, il est possible que le nœud principal termine sa tâche et s’arrête avant la fin de la tâche de travail. Pour éviter les échecs de connexion, mettez en place une logique de collection des résultats dans la tâche.

  • Épuisement de la mémoire : Si les tâches échouent en raison de problèmes de mémoire, augmentez la taille du nœud ou utilisez davantage de nœuds avec moins de données par nœud.

  • Délai d’inactivité de la disponibilité du nœud : Si le nombre d’instances requis (soit target_instances ou min_instances) ne sont pas disponibles dans le délai d’inactivité prédéfini, la tâche échouera. Assurez-vous que votre pool de calcul dispose d’une capacité suffisante ou ajustez les exigences de votre instance.