Verarbeiten von Daten mit benutzerdefinierter Logik über Partitionen hinweg¶
Verwenden Sie die Distributed Partition Function (DPF), um Daten parallel über einen oder mehrere Knoten hinweg in einem Computepool zu verarbeiten. Sie handhabt automatisch die verteilte Orchestrierung, Fehler, Beobachtbarkeit und die Persistenz von Artefakten.
DPF unterteilt Ihren Snowpark DataFrame durch eine bestimmte Spalte und führt Ihre Python-Funktion auf jeder Partition parallel aus. Konzentrieren Sie sich auf Ihre Verarbeitungslogik, während DPF sich um die Komplexität der Infrastruktur kümmert und automatisch skaliert.
Sie können DPF zur effizienten Verarbeitung großer Datensets über verschiedene Datensegmente hinweg verwenden. Dieses Tool eignet sich ideal für Szenarien wie die Analyse von Verkaufsdaten nach Regionen, die Verarbeitung von Kundendaten nach geografischen Segmenten oder die Durchführung von Datentransformationen, bei denen jede Datenpartition dieselbe Verarbeitungslogik erfordert. DPF übernimmt die verteilte Datenverarbeitung automatisch, wodurch die Komplexität der Verwaltung der verteilten Computing-Infrastruktur beseitigt wird.
Mit DPF können Sie kundenspezifischen Python-Code unter Verwendung von Open-Source-Bibliotheken in einer containerisierten Infrastruktur mit GPU-Zugriff schreiben.
Wichtig
Bevor Sie DPF verwenden, vergewissern Sie sich, dass Sie Folgendes haben:
Container-Laufzeitumgebung: DPF erfordert eine Snowflake ML Container-Laufzeitumgebung.
Staging-Zugriffsberechtigungen: DPF speichert Ergebnisse und Artefakte automatisch in Snowflake-Stagingbereichen. Stellen Sie sicher, dass Sie über die entsprechenden Berechtigungen für den Zugriff auf den angegebenen Stagingbereich verfügen.
Der folgende Abschnitt führt Sie durch die Verwendung von DPF für ein hypothetisches Verkaufsdatenset.
Verarbeiten der Verkaufsdaten nach Region¶
Im folgenden Beispiel wird die Verwendung von DPF zur parallelen Verarbeitung von Verkaufsdaten nach Region veranschaulicht:
Verarbeitungsfunktion definieren: Erstellen Sie eine Python-Funktion, die die Transformationslogik angibt, die auf jede Datenpartition (Region) angewendet werden soll.
DPF zum Verarbeiten von Daten verwenden: Initialisieren Sie DPF mit der Verarbeitungsfunktion und führen Sie sie über alle Partitionen hinweg gleichzeitig aus.
Fortschritt überwachen und auf Abschluss warten: Verfolgen Sie den Verarbeitungsstatus und warten Sie, bis die Ausführung aller Partitionen abgeschlossen ist.
Ergebnisse aus jeder Partition abrufen: Sammeln und kombinieren Sie die verarbeiteten Ergebnisse aus allen Regionen nach erfolgreichem Abschluss.
Optional: Ergebnisse später wiederherstellen: Greifen Sie auf zuvor abgeschlossene Ergebnisse zu, ohne den gesamten Prozess erneut ausführen zu müssen.
Wenn Sie ein Verkaufsdatenset haben, sind möglicherweise regionale Daten als Spalte im DataFrame enthalten. Möglicherweise möchten Sie auf die Daten jeder Region eine Verarbeitungslogik anwenden. Sie können die Funktion Distributed Partition (DPF) verwenden, um Verkaufsdaten für verschiedene Regionen parallel zu verarbeiten.
Sie könnten einen DataFrame namens sales_data
haben, der die folgenden Spalten enthält:
region
: North, South, East, Westcustomer_id
: eindeutige Kundenbezeichneramount
: Transaktionsbeträgeorder_date
: Transaktionsdaten
Die Verarbeitungsfunktion definieren¶
Der folgende Code definiert eine Verarbeitungsfunktion, die die Verkaufsdaten nach Region verarbeitet. Sie gibt die Transformationslogik an, die auf jede Datenpartition angewendet wird, und speichert die Ergebnisse in einem Stagingbereich.
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import RunStatus
# Define function to process each region's data
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
import json
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Die Funktion process_sales_data
erhält zwei wichtige Argumente, die eine verteilte Verarbeitung ermöglichen:
data_connector
: Wird verwendet, um Zugriff aufsales_data
DataFrame zu ermöglichencontext
: Wird verwendet, um die Ergebnisse in einen Stagingbereich zu schreiben
Für jede Region erstellt die Funktion die folgenden Spalten:
total_sales
avg_order_value
customer_count
record_count
Anschließend werden die Ergebnisse in einer JSON-Datei namens sales_summary.json
in einen Stagingbereich geschrieben.
Verwenden von DPF, um Daten mit der Funktion zu verarbeiten¶
Nachdem Sie eine Verarbeitungsfunktion erstellt haben, können Sie den folgenden Code verwenden, um die Daten parallel über Partitionen mit der Funktion DPF zu verarbeiten.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region", # Creates separate partitions for North, South, East, West
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
Sie können den folgenden Code zur Ausführung von DPF über mehrere Knoten hinweg verwenden:
from snowflake.ml.runtime_cluster import scale_cluster
# Scale cluster before running tuner
scale_cluster(2) # Scale to 2 nodes for parallel trials
Fortschritt überwachen und auf Abschluss warten¶
Sie können den folgenden Code verwenden, um den Fortschritt der Ausführung von DPF zu überwachen.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Im Folgenden sehen Sie die Ausgabe des Codes:
Processing partitions: 100%|██████████| 4/4 [02:15<00:00, 33.75s/partition]
Job completed with status: RunStatus.SUCCESS
Abrufen der Ergebnisse von jeder Partition¶
Wenn die Ausführung erfolgreich abgeschlossen wurde, können Sie die Ergebnisse aus jeder Partition abrufen. Der folgende Code ruft die Verkaufsergebnisse nach Region ab.
if final_status == RunStatus.SUCCESS:
# Get results from each region
import json
all_results = []
for partition_id, details in run.partition_details.items():
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
else:
# Handle failures - check logs for failed partitions
for partition_id, details in run.partition_details.items():
if details.status != "DONE":
error_logs = details.logs
Optional: Stellt die Ergebnisse von der abgeschlossenen Ausführung wieder her¶
Sie können die abgeschlossene Ausführung aus dem Stagingbereich wiederherstellen und auf dieselben Ergebnisse zugreifen, ohne den Prozess erneut ausführen zu müssen. Der folgende Code zeigt, wie das funktioniert:
# Restore completed run from stage and access same results as above without re-running.
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import (
DPFRun
)
restored_run = DPFRun.restore_from("regional_analytics_2024", "analytics_stage")
Nächste Schritte¶
Lesen Sie Trainieren von Modellen über Datenpartitionen hinweg, um Informationen zum Trainieren mehrerer ML-Modelle mit DPF als zugrunde liegende Infrastruktur zu erfahren
Fortgeschrittene Nutzungsmuster, Strategien zur Fehlerbehandlung und Techniken zur Leistungsoptimierung finden Sie in der vollständigen API-Dokumentation des Snowflake ML Python-Pakets