Traitement des données avec une logique personnalisée sur toutes les partitions¶
Utilisez la fonction de partition distribuée (DPF) pour traiter des données en parallèle sur un ou plusieurs nœuds d’un pool de calcul. La DPF gère automatiquement l’orchestration distribuée, les erreurs, l’observabilité et la persistance des artefacts. Vous pouvez exécuter une DPF dans un Notebook Snowflake ou une tâche ML Snowflake.
La DPF prend en charge les modes d’exécution suivants :
Mode DataFrame (
run()) : Partitionnez un DataFrame Snowpark par valeurs de colonne et exécutez votre fonction sur chaque partition simultanément. Les données sont préextraites en parallèle pour un débit optimal.Mode de zone de préparation (
run_from_stage()) : Traitez des fichiers d’une zone de préparation Snowflake où chaque fichier devient une partition. Idéal pour le traitement de fichiers à grande échelle avec une utilisation prévisible de la mémoire.
Vous pouvez utiliser la DPF pour traiter efficacement de grands ensembles de données sur différents segments de données.
Cet outil est idéal pour les scénarios suivants :
Analyse des données de vente par région
Traitement des données client par segments géographiques
Formation de modèles ML sur chaque partition de données
Exécution de transformations de données où chaque partition de données nécessite la même logique de traitement
La DPF traite automatiquement le traitement des données distribuées. Vous n’avez pas besoin de gérer l’infrastructure de calcul distribuée.
La DPF vous permet d’écrire du code Python personnalisé en utilisant des bibliothèques open source sur une infrastructure conteneurisée avec accès GPU.
Important
La DPF stocke automatiquement les résultats et les artefacts dans des zones de préparation Snowflake. Avant d’utiliser la DPF, assurez-vous d’avoir l’autorisation sur la zone de préparation sur laquelle la DPF stocke les résultats et les artefacts.
Mode DataFrame : Traiter les données par partitions de colonnes¶
Utilisez le mode DataFrame pour partitionner votre DataFrame Snowpark selon une colonne spécifiée et exécuter votre fonction Python sur chaque partition en parallèle. L’exemple suivant illustre le traitement des données de ventes par région.
Définition de la fonction de traitement¶
Importez les classes nécessaires à l’exécution du traitement distribué :
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, ExecutionOptions
)
Définissez une fonction de traitement qui prend deux arguments :
data_connector: Un DataConnector qui donne accès aux données de la partition. Appelezdata_connector.to_pandas()pour le charger en tant que DataFrame Pandas, ou utilisez d’autres méthodes commeto_torch_dataset()outo_ray_dataset().context: Un objet PartitionContext qui fournit l’ID de partition et les méthodes pour l’importation et le téléchargement d’artefacts.
import json
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Pour chaque région, cette fonction calcule des statistiques récapitulatives et enregistre les résultats sous forme de fichier JSON vers le répertoire de zone de préparation dédié à la partition.
Initialiser et exécuter une DPF¶
Créez une instance DPF avec votre fonction de traitement et un nom de zone de préparation de sortie, puis appelez run() pour démarrer le traitement distribué.
Important
Le DataFrame Snowpark que vous fournissez doit être créé à partir d’une table. Pour plus d’informations sur la création d’un DataFrame à partir d’une table, voir Construire un DataFrame.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
La méthode run() accepte les paramètres suivants :
partition_by(str): Nom de la colonne dans laquelle partitionner le DataFrame. Chaque valeur unique crée une partition distincte.snowpark_dataframe: Le DataFrame Snowpark à partitionner et à traiter.run_id(str): identificateur unique de l’exécution. Crée un répertoire dédié@{stage_name}/{run_id}/pour tous les artefacts. Utilisez des noms descriptifs commeexperiment_2024_01_15oumodel_v1_retrain.on_existing_artifacts(chaîne, facultatif) : Action lorsque des artefacts pour l’run_idexistent déjà."error"(par défaut) génère une erreur ;"overwrite"remplace les artefacts existants.execution_options(ExecutionOptions, facultatif) : Configuration pour l”=’allocation des ressources et le comportement d’exécution.
Contrôle de la progression et attente de la fin de l’exécution¶
Appelez wait() pour bloquer jusqu’à ce que l’exécution soit terminée. Par défaut, une barre de progression s’affiche.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Voici un exemple de la sortie :
Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS
Vous pouvez également vérifier le statut et la progression à tout moment sans blocage :
# Check overall status
current_status = run.status
# Get progress grouped by partition status
progress = run.get_progress()
Récupération des résultats de chaque partition¶
Une fois l’exécution terminée avec succès, récupérez les résultats de chaque partition en utilisant la propriété partition_details. Les détails de chaque partition incluent un stage_artifacts_manager pour accéder aux artefacts enregistrés.
if final_status == RunStatus.SUCCESS:
import json
all_results = []
for partition_id, details in run.partition_details.items():
# List available artifacts for this partition
files = details.stage_artifacts_manager.list()
print(f"Partition {partition_id} artifacts: {files}")
# Load an artifact using a custom deserializer
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
Le stage_artifacts_manager propose trois méthodes :
list(): Renvoie une liste de noms de fichiers enregistrés dans le répertoire de zone de préparation de la partition.get(filename, read_function=None): Télécharge et désérialise un artefact. Utilisepicklepar défaut, ou uneread_functionpersonnalisée si fournie.download(filename, local_dir): Télécharge un fichier brut dans un répertoire local et renvoie le chemin du fichier local.
Traitement des erreurs¶
Si l’exécution échoue, vérifiez les détails des partitions individuelles pour diagnostiquer les échecs :
if final_status != RunStatus.SUCCESS:
for partition_id, details in run.partition_details.items():
if details.status != PartitionStatus.DONE:
print(f"Partition {partition_id} status: {details.status}")
try:
error_logs = details.logs
print(error_logs)
except RuntimeError:
print("Logs not available for this partition")
Le RunStatus global reflète le résultat agrégé :
SUCCESS: Toutes les partitions se sont terminées avec succès.PARTIAL_FAILURE: Certaines partitions ont réussi, mais au moins une a échoué.FAILURE: Aucune partition a été complétée avec succès.CANCELLED: L’exécution a été annulée.IN_PROGRESS: L’exécution est toujours en cours.
Chaque partition dispose d’un PartitionStatus :
PENDING: En attente de traitement.RUNNING: En cours de traitement.DONE: Terminé correctement.FAILED: La fonction utilisateur a soulevé une exception.CANCELLED: Annulé par l’utilisateur.INTERNAL_ERROR: Une erreur interne du système s’est produite (par exemple, manque de mémoire).
Pour importer ces énumérations :
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, PartitionStatus
)
Pour annuler une tâche en cours :
run.cancel()
Note
Les partitions déjà terminées ne sont pas concernées par l’annulation. Les résultats partiels, les journaux et les artefacts des partitions terminées restent disponibles.
Restauration des résultats d’une exécution terminée¶
Vous pouvez restaurer une exécution terminée à partir de son état persistant et accéder aux mêmes résultats sans relancer le processus :
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")
# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
print(f"{partition_id}: {details.status}")
Note
Les exécutions restaurées sont en lecture seule. Vous ne pouvez pas appeler wait() ou cancel() sur une exécution restaurée.
Mode de zone de préparation : Traitement de fichiers en zone de préparation¶
Utilisez le mode zone de préparation pour traiter les fichiers d’une zone de préparation Snowflake où chaque fichier devient une partition. Cette fonction est idéale pour le traitement de fichiers à grande échelle, comme le traitement d’une collection de fichiers Parquet qui ont été placés en zone de préparation.
Définition de la fonction de traitement¶
La signature de la fonction de traitement est la même que le mode DataFrame. Le data_connector donne accès aux données du fichier, et context.partition_id est le chemin d’accès relatif au fichier dans la zone de préparation.
def process_file(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing file {context.partition_id}: {len(df)} rows")
# Process data and save results
result = {"row_count": len(df), "columns": list(df.columns)}
import json
context.upload_to_stage(result, "result.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Ajouter une DPF depuis une zone de préparation¶
Appelez run_from_stage() au lieu de run(). Spécifier la stage_location d’entrée contenant les fichiers sources et éventuellement un file_pattern pour filtrer les fichiers à traiter.
dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
stage_location="@my_db.my_schema.input_stage/data/",
run_id="file_processing_2024",
file_pattern="*.parquet",
)
final_status = run.wait()
La méthode run_from_stage() accepte les paramètres suivants :
stage_location(str): Chemin de la zone de préparation d’entrée contenant les fichiers de données source. Chaque fichier correspondant aufile_patterndevient une partition. Prend en charge les noms de zone de préparation simples et complets :Nom simple :
"@my_stage/data/"Nom complet :
"@my_db.my_schema.my_stage/data/"
run_id(chaîne) : identificateur unique de l’exécution.file_pattern(chaîne, facultatif) : Modèle global pour filtrer les fichiers. La valeur par défaut est"*.parquet".on_existing_artifacts(chaîne, obligatoire) :"error"(par défaut) ou"overwrite".execution_options(ExecutionOptions, facultatif) : Configuration pour l”=’allocation des ressources et le comportement d’exécution.
Note
Le stage_location est la source de données d’entrée. Le stage_name fourni à DPF() est l’emplacement de sortie d’artefacts tels que les journaux et les résultats. Il peut s’agir de zones de préparation différentes.
La surveillance, la récupération des résultats, la gestion des erreurs et la restauration des exécutions fonctionnent de la même manière qu’en mode DataFrame.
Pour le traitement des fichiers liés aux E/S, définissez num_cpus_per_worker=1 dans ExecutionOptions pour maximiser le parallélisme (un acteur par CPU). Pour les charges de travail liées au CPU, utilisez la valeur par défaut ou augmentez num_cpus_per_worker.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
run = dpf.run_from_stage(
stage_location="@my_stage/data/",
run_id="io_bound_processing",
execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Configuration des options d’exécution¶
Utilisez ExecutionOptions pour contrôler l’attribution des ressources et le comportement d’exécution, tels que l’attribution CPU/GPU par travailleur, le nombre de nouvelles tentatives et le comportement fail-fast. Tous les champs sont facultatifs et comportent des valeurs par défaut raisonnables.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
options = ExecutionOptions(
num_cpus_per_worker=4,
num_gpus_per_worker=1,
fail_fast=True,
)
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="my_run",
execution_options=options,
)
Pour obtenir la liste complète des paramètres et le comportement de mise à l’échelle des travailleurs, voir la référence API ExecutionOptions.
Travailler avec des artefacts en utilisant PartitionContext¶
L’objet PartitionContext est transmis en tant que deuxième argument à votre fonction de traitement. Cela fournit des méthodes pour interagir avec les artefacts et les sessions Snowflake pendant l’exécution de la partition.
Importer les artefacts¶
Utilisez upload_to_stage() pour enregistrer les résultats de dans votre fonction de traitement. Par défaut, les objets sont sérialisés à l’aide de pickle. Fournissez une write_function pour une sérialisation personnalisée.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Save a pickle object (default serialization)
results = {'total': df['amount'].sum(), 'count': len(df)}
context.upload_to_stage(results, "summary.pkl")
# Save JSON data with a custom write function
import json
context.upload_to_stage(
results, "summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
)
# Save a CSV file
df_processed = df.groupby('category').sum()
context.upload_to_stage(
df_processed, "aggregated.csv",
write_function=lambda df, path: df.to_csv(path, index=False)
)
Télécharger les artefacts¶
Utilisez download_from_stage() pour charger des artefacts dans votre fonction de traitement. Vous pouvez utiliser cette fonction pour accéder aux artefacts d’une exécution précédente. Par exemple, vous pouvez l’utiliser pour charger un modèle pour l’inférence.
def my_inference_function(data_connector, context):
# Load a pickle object from the current partition's stage path
model = context.download_from_stage("model.pkl")
# Load from a different stage path (e.g., from a prior training run)
model = context.download_from_stage(
"model.pkl",
stage_path="@db.schema.stage/training_run/partition_0"
)
# Load JSON with a custom deserializer
import json
config = context.download_from_stage(
"config.json",
read_function=lambda path: json.load(open(path, 'r'))
)
Utiliser les sessions Snowflake¶
Utilisez with_session() pour exécuter des opérations qui nécessitent une session Snowflake, telles que l’écriture de résultats dans une table. Cette méthode utilise un pool de sessions limité pour éviter d’atteindre les limites de session de Snowflake lors de l’exécution simultanée de plusieurs partitions.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Load a model from a prior training run
model = context.download_from_stage("model.pkl")
predictions = model.predict(df[['X1', 'X2']])
results = df.copy()
results['predictions'] = predictions
results['partition_id'] = context.partition_id
# Write results to a Snowflake table using the bounded session pool
context.with_session(lambda session:
session.create_dataframe(results)
.write.mode("append")
.save_as_table("predictions_table")
)
Note
La fonction transmise à with_session() est sérialisée à l’aide de cloudpickle. Évitez de capturer de grands objets ou des ressources non sérialisables dans la fermeture.
Évolutivité sur plusieurs nœuds¶
Pour exécuter une DPF sur plusieurs nœuds, mettez votre cluster à l’échelle avant de démarrer l’exécution :
from snowflake.ml.runtime_cluster import scale_cluster
# Scale to 3 nodes for increased parallelism
scale_cluster(3)
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="multi_node_run",
execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
En cas d’exécution sur plusieurs nœuds, définissez use_head_node=False si vous souhaitez que le nœud de tête agisse uniquement en tant que coordinateur sans exécuter de fonctions utilisateur. Cela peut améliorer la fiabilité pour les charges de travail de longue durée, car une erreur de mémoire insuffisante sur un nœud de travail n’affecte pas le coordinateur.
Limites et contraintes¶
Une exécution simultanée : Une seule exécution DPF peut s’exécuter en même temps. Le lancement d’une nouvelle exécution alors qu’une autre est en cours génère une erreur. Annulez l’exécution précédente avec
run.cancel()avant d’en démarrer une nouvelle.Exigences en matière de DataFrame : En mode DataFrame, le DataFrame Snowpark doit contenir exactement une seule requête et aucune post-action.
Restriction à un seul nœud :
use_head_node=Falsen’est pas pris en charge sur les clusters à un seul nœud.Structure de chemin d’artefact : Les artefacts sont stockés sur
@{stage_name}/{run_id}/{partition_id}/. Cette structure de chemin est fixe et ne peut pas être personnalisée.Les exécutions restaurées sont en lecture seule : Les exécutions restaurées avec
DPFRun.restore_from()ne peut pas appelerwait()oucancel().
Prochaines étapes¶
Explorez Entraînez des modèles sur toutes les partitions de données pour en savoir plus sur l’entraînement de plusieurs modèles de ML utilisant la DPF comme infrastructure sous-jacente.
Pour la documentation complète relative à l’API, voir la section Référence API de fonction de partition distribuée (DPF).
Pour des exemples de bout en bout, voir notebooks d’échantillons Snowflake ML.