Traitement des données avec une logique personnalisée sur toutes les partitions¶
Utilisez la fonction de partition distribuée (DPF) pour traiter des données en parallèle sur un ou plusieurs nœuds d’un pool de calcul. Elle gère automatiquement l’orchestration distribuée, les erreurs, l’observabilité et la persistance des artefacts.
La DPF partitionne votre DataFrame Snowpark selon une colonne spécifiée et exécute votre fonction Python sur chaque partition en parallèle. Concentrez-vous sur votre logique de traitement pendant que la DPF gère la complexité de l’infrastructure et s’adapte automatiquement.
Vous pouvez utiliser la DPF pour traiter efficacement de grands ensembles de données sur différents segments de données. Cet outil est idéal pour les scénarios tels que l’analyse des données de vente par région, le traitement des données des clients par segments géographiques, ou l’exécution de transformations de données où chaque partition de données nécessite la même logique de traitement. La DPF gère automatiquement le traitement des données distribuées, ce qui élimine la complexité de la gestion de l’infrastructure de calcul distribuée.
La DPF vous permet d’écrire du code Python personnalisé en utilisant des bibliothèques open source sur une infrastructure conteneurisée avec accès GPU.
Important
Avant de commencer le DPF, assurez-vous de disposer des éléments suivants :
Environnement Container Runtime : le DPF nécessite un environnement de ML Container Runtime Snowflake.
Autorisations d’accès à la zone de préparation : la DPF stocke automatiquement les résultats et les artefacts dans des zones de préparation Snowflake. Assurez-vous de disposer des autorisations appropriées pour accéder à la zone de préparation nommée spécifiée.
La section suivante vous montre l’utilisation de la DPF sur un ensemble de données de ventes hypothétique.
Traitement des données de vente par région¶
L’exemple suivant montre comment utiliser la DPF pour traiter en parallèle les données de vente par région :
Définition de la fonction de traitement – Créez une fonction Python qui spécifie la logique de transformation à appliquer à chaque partition (région) de données.
Utilisation de la DPF pour traiter des données – Initialisez la DPF avec votre fonction de traitement et exécutez-la sur toutes les partitions simultanément.
Contrôle de la progression et attente de la fin de l’exécution – Suivez l’état du traitement et attendez la fin de l’exécution de toutes les partitions.
Récupération des résultats de chaque partition – Collectez et combinez les résultats traités de toutes les régions une fois l’exécution achevée avec succès.
Facultatif : restauration des résultats plus tard – Accédez aux résultats précédemment complétés sans réexécuter l’ensemble du processus.
Si vous disposez d’un ensemble de données de ventes, vous pouvez avoir des données régionales sous forme de colonne dans le dataframe. Vous pouvez appliquer une logique de traitement aux données de chaque région. Vous pouvez utiliser la fonction de partition distribuée (DPF) pour traiter les données de vente de différentes régions en parallèle.
Vous pourriez avoir un dataframe appelé sales_data
qui contient les colonnes suivantes :
region
: « Nord », « Sud », « Est », « Ouest »customer_id
: identificateurs uniques du clientamount
: montants de transactionsorder_date
: dates de transactions
Définition de la fonction de traitement¶
Le code suivant définit une fonction de traitement qui traite les données de ventes par région. Il spécifie la logique de transformation appliquée à chaque partition de données et enregistre les résultats dans une zone de préparation.
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus
# Define function to process each region's data
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
import json
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
La fonction process_sales_data
reçoit deux arguments clés qui permettent le traitement distribué :
data_connector
: utilisé pour fournir un accès au DataFramesales_data
context
: utilisé pour écrire les résultats dans une zone de préparation
Pour chaque région, la fonction crée les colonnes suivantes :
total_sales
avg_order_value
customer_count
record_count
Elle écrit ensuite les résultats dans une zone de préparation sous forme de fichier JSON nommé sales_summary.json
.
Utilisation de la DPF pour traiter des données avec la fonction¶
Après avoir créé une fonction de traitement, vous pouvez utiliser le code suivant pour traiter les données en parallèle sur les partitions avec la fonction DPF.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region", # Creates separate partitions for North, South, East, West
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
Vous pouvez utiliser le code suivant pour exécuter la DPF sur plusieurs nœuds :
from snowflake.ml.runtime_cluster import scale_cluster
# Scale cluster before running tuner
scale_cluster(2) # Scale to 2 nodes for parallel trials
Contrôle de la progression et attente de la fin de l’exécution¶
Vous pouvez utiliser le code suivant pour suivre la progression de l’exécution de la DPF.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Voici la sortie du code :
Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS
Récupération des résultats de chaque partition¶
Si l’exécution s’achève avec succès, vous pouvez récupérer les résultats de chaque partition. Le code suivant permet d’obtenir les résultats des ventes par région.
if final_status == RunStatus.SUCCESS:
# Get results from each region
import json
all_results = []
for partition_id, details in run.partition_details.items():
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
else:
# Handle failures - check logs for failed partitions
for partition_id, details in run.partition_details.items():
if details.status != "DONE":
error_logs = details.logs
En option : Restauration des résultats d’une exécution terminée¶
Vous pouvez restaurer l’exécution terminée à partir de la zone de préparation et accéder aux mêmes résultats sans relancer le processus. Le code suivant montre comment procéder :
# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Prochaines étapes¶
Explorez Entraînez des modèles sur toutes les partitions de données pour en savoir plus sur l’entraînement de plusieurs modèles de ML utilisant la DPF comme infrastructure sous-jacente
Pour des modèles d’utilisation, des stratégies de traitement des erreurs et des techniques d’optimisation des performances plus avancés, consultez la documentation API complète dans le paquet Python de ML Snowflake