Verarbeiten von Daten mit benutzerdefinierter Logik über Partitionen hinweg¶
Verwenden Sie DPF (Distributed Partition Function), um Daten parallel über einen oder mehrere Knoten in einem Computepool zu verarbeiten. DPF handhabt die verteilte Orchestrierung, Fehler, Beobachtbarkeit und die Persistenz von Artefakten automatisch. Sie können DPF entweder in einem Snowflake Notebook oder einem Snowflake ML-Job ausführen.
DPF unterstützt die folgenden Ausführungsmodi:
DataFrame-Modus (
run()): Partitioniert einen Snowpark-DataFrame nach Spaltenwerten und führt die Funktion für jede Partition gleichzeitig aus. Die Daten werden parallel für einen optimalen Durchsatz vorab abgerufen.Stagingbereichsmodus (
run_from_stage()): Verarbeitet Dateien aus einem Snowflake-Stagingbereich, in dem jede Datei zu einer Partition wird. Ideal für die Verarbeitung von Dateien im großen Umfang mit vorhersehbarer Speichernutzung.
Sie können DPF zur effizienten Verarbeitung großer Datensets über verschiedene Datensegmente hinweg verwenden.
Dieses Tool ist ideal für Szenarien wie die folgenden:
Analysieren von Verkaufsdaten nach Regionen
Verarbeiten von Kundendaten nach geografischen Segmenten
Trainieren von ML -Modellen für jede Datenpartition
Durchführen von Datentransformationen, bei denen jede Datenpartition dieselbe Verarbeitungslogik erfordert
DPF verarbeitet die verteilte Datenverarbeitung automatisch. Sie müssen die verteilte Computing-Infrastruktur nicht verwalten.
Mit DPF können Sie kundenspezifischen Python-Code unter Verwendung von Open-Source-Bibliotheken in einer containerisierten Infrastruktur mit GPU-Zugriff schreiben.
Wichtig
DPF speichert Ergebnisse und Artefakte automatisch in Snowflake-Stagingbereichen. Bevor Sie DPF verwenden, stellen Sie sicher, dass Sie über Berechtigungen für den Stagingbereich verfügen, in dem DPF Ergebnisse und Artefakte speichert.
DataFrame-Modus: Verarbeiten von Daten nach Spaltenpartitionen¶
Verwenden Sie den DataFrame-Modus, um den Snowpark-DataFrame anhand einer bestimmten Spalte zu partionieren und die Python-Funktion auf jeder Partition parallel auszuführen. Das folgende Beispiel zeigt die Verarbeitung von Umsatzdaten nach Region.
Die Verarbeitungsfunktion definieren¶
Importieren Sie die Klassen, die für die verteilte Verarbeitung erforderlich sind:
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, ExecutionOptions
)
Definieren Sie eine Verarbeitungsfunktion, die zwei Argumente annimmt:
data_connector: Ein DataConnector, der Zugriff auf die Daten der Partition hat. Rufen Siedata_connector.to_pandas()auf, um ihn als pandas-DataFrame zu laden, oder verwenden Sie Methoden wieto_torch_dataset()oderto_ray_dataset().context: Ein PartitionContext-Objekt, das die Partitions-ID und Methoden zum Hoch- und Herunterladen von Artefakten bereitstellt.
import json
def process_sales_data(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing {len(df)} records for region: {context.partition_id}")
# Perform region-specific analytics
summary = {
'region': context.partition_id,
'total_sales': df['amount'].sum(),
'avg_order_value': df['amount'].mean(),
'customer_count': df['customer_id'].nunique(),
'record_count': len(df)
}
# Store results in stage for subsequent access
context.upload_to_stage(summary, "sales_summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Für jede Region berechnet diese Funktion zusammenfassende Statistiken und speichert die Ergebnisse als JSON-Datei im dedizierten Stagingbereichsverzeichnis der Partition.
Initialisieren und Ausführen von DPF¶
Erstellen Sie eine DPF-Instanz mit Ihrer Verarbeitungsfunktion und einen Namen für den Ausgabe-Stagingbereich, und rufen Sie dann run() auf, um die verteilte Verarbeitung zu starten.
Wichtig
Der von Ihnen bereitgestellte Snowpark-DataFrame muss aus einer Tabelle erstellt werden. Weitere Informationen zum Erstellen eines DataFrame aus einer Tabelle finden Sie unter Erstellen eines DataFrame.
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="regional_analytics_2024"
)
Die Methode run() akzeptiert die folgenden Parameter:
partition_by(str): Name der Spalte, nach der der DataFrame partitioniert werden soll. Jeder eindeutige Wert erstellt eine eigene Partition.snowpark_dataframe: Der Snowpark-DataFrame, der partitioniert und verarbeitet werden soll.run_id(str): Eindeutiger Bezeichner für diese Ausführung. Erstellt ein dediziertes@{stage_name}/{run_id}/-Verzeichnis für alle Artefakte. Verwenden Sie beschreibende Namen wieexperiment_2024_01_15odermodel_v1_retrain.on_existing_artifacts(str, optional): Aktion, wenn Artefakte für dierun_idbereits vorhanden sind."error"(Standard) löst einen Fehler aus,"overwrite"ersetzt vorhandene Artefakte.execution_options(ExecutionOptions, optional): Konfiguration der Ressourcenzuweisung und des Ausführungsverhaltens.
Fortschritt überwachen und auf Abschluss warten¶
Rufen Sie wait() auf, um bis zum Abschluss der Ausführung zu blockieren. Standardmäßig wird ein Fortschrittsbalken angezeigt.
final_status = run.wait() # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Die Ausgabe könnte beispielsweise wie folgt aussehen:
Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS
Sie können den Status und den Fortschritt auch jederzeit überprüfen, ohne zu blockieren:
# Check overall status
current_status = run.status
# Get progress grouped by partition status
progress = run.get_progress()
Abrufen der Ergebnisse von jeder Partition¶
Nachdem die Ausführung erfolgreich abgeschlossen wurde, rufen Sie die Ergebnisse jeder Partition mit der partition_details-Eigenschaft ab. Die Details jeder Partition enthalten einen stage_artifacts_manager für den Zugriff auf gespeicherte Artefakte.
if final_status == RunStatus.SUCCESS:
import json
all_results = []
for partition_id, details in run.partition_details.items():
# List available artifacts for this partition
files = details.stage_artifacts_manager.list()
print(f"Partition {partition_id} artifacts: {files}")
# Load an artifact using a custom deserializer
summary = details.stage_artifacts_manager.get("sales_summary.json",
read_function=lambda path: json.load(open(path, 'r')))
all_results.append(summary)
# Combine results across all regions
total_sales = sum(r['total_sales'] for r in all_results)
total_customers = sum(r['customer_count'] for r in all_results)
Der stage_artifacts_manager bietet drei Methoden:
list(): Gibt eine Liste der Dateinamen zurück, die im Stagingbereichsverzeichnis der Partition gespeichert sind.get(filename, read_function=None): Lädt ein Artefakt herunter und deserialisiert es. Verwendet standardmäßigpickleoder eine kundenspezifischeread_function, falls angegeben.download(filename, local_dir): Lädt eine Rohdatei in ein lokales Verzeichnis herunter und gibt den lokalen Dateipfad zurück.
Verarbeiten von Fehlern¶
Wenn die Ausführung nicht erfolgreich ist, überprüfen Sie die Details der einzelnen Partitionen, um Fehler zu diagnostizieren:
if final_status != RunStatus.SUCCESS:
for partition_id, details in run.partition_details.items():
if details.status != PartitionStatus.DONE:
print(f"Partition {partition_id} status: {details.status}")
try:
error_logs = details.logs
print(error_logs)
except RuntimeError:
print("Logs not available for this partition")
Der Gesamt-RunStatus spiegelt das aggregierte Ergebnis wider:
SUCCESS: Alle Partitionen wurden erfolgreich abgeschlossen.PARTIAL_FAILURE: Einige Partitionen wurden erfolgreich abgeschlossen, aber mindestens eine ist fehlgeschlagen.FAILURE: Es wurden keine Partitionen erfolgreich abgeschlossen.CANCELLED: Die Ausführung wurde abgebrochen.IN_PROGRESS: Die Ausführung läuft noch.
Jede Partition hat einen PartitionStatus:
PENDING: Wartet auf die Bearbeitung.RUNNING: Wird derzeit bearbeitet.DONE: Wurde erfolgreich abgeschlossen.FAILED: Die Benutzerfunktion hat eine Ausnahme ausgelöst.CANCELLED: Vom Benutzenden storniert.INTERNAL_ERROR: Es ist ein interner Systemfehler aufgetreten (z. B. Out-of-Memory).
So importieren Sie diese Enumerationen:
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
RunStatus, PartitionStatus
)
So brechen Sie einen laufenden Job ab:
run.cancel()
Bemerkung
Bereits abgeschlossene Partitionen sind von dem Abbruch nicht betroffen. Teilergebnisse, Protokolle und Artefakte von abgeschlossenen Partitionen bleiben verfügbar.
Wiederherstellen der Ergebnisse einer abgeschlossenen Ausführung¶
Sie können eine abgeschlossene Ausführung aus ihrem persistierten Zustand wieder herstellen und auf dieselben Ergebnisse zugreifen, ohne den Prozess erneut ausführen zu müssen:
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")
# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
print(f"{partition_id}: {details.status}")
Bemerkung
Wiederhergestellte Ausführungen sind schreibgeschützt. Sie können bei einer wiederhergestellten Ausführung nicht wait() oder cancel() aufrufen.
Stagingbereichsmodus: Verarbeiten von Dateien aus einem Stagingbereich¶
Verwenden Sie den Stagingbereichsmodus, um Dateien aus einem Snowflake-Stagingbereich zu verarbeiten, in dem jede Datei zu einer Partition wird. Dies ist ideal für die Dateiverarbeitung im großen Umfang, z. B. die Verarbeitung einer Sammlung von Parquet-Dateien, die im Stagingbereich bereitgestellt wurden.
Definieren einer Verarbeitungsfunktion¶
Die Signatur der Verarbeitungsfunktion ist dieselbe wie der DataFrame-Modus. Der data_connector bietet Zugriff auf die Daten der Datei, und context.partition_id ist der relative Dateipfad innerhalb des Stagingbereichs.
def process_file(data_connector, context):
df = data_connector.to_pandas()
print(f"Processing file {context.partition_id}: {len(df)} rows")
# Process data and save results
result = {"row_count": len(df), "columns": list(df.columns)}
import json
context.upload_to_stage(result, "result.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
DPF vom Stagingbereich aus ausführen¶
Rufen Sie run_from_stage() anstelle von run() auf. Geben Sie den Eingabe-stage_location an, der die Quelldateien enthält, sowie optional ein file_pattern, um nach den Dateien zu filtern, die verarbeitet werden sollen.
dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
stage_location="@my_db.my_schema.input_stage/data/",
run_id="file_processing_2024",
file_pattern="*.parquet",
)
final_status = run.wait()
Die Methode run_from_stage() akzeptiert die folgenden Parameter:
stage_location(str): Pfad zum Eingabe-Stagingbereich, der die Quelldatendateien enthält. Jede Datei, die mit demfile_patternübereinstimmt, wird zu einer Partition. Unterstützt sowohl einfache als auch vollqualifizierte Stagingbereichsnamen:Einfach:
"@my_stage/data/"Vollqualifiziert:
"@my_db.my_schema.my_stage/data/"
run_id(str): Eindeutiger Bezeichner für diese Ausführung.file_pattern(str, optional): Glob-Muster zum Filtern von Dateien. Die Standardeinstellung ist"*.parquet".on_existing_artifacts(str, optional):"error"(Standard) oder"overwrite".execution_options(ExecutionOptions, optional): Konfiguration der Ressourcenzuweisung und des Ausführungsverhaltens.
Bemerkung
stage_location ist die Eingabe-Datenquelle. stage_name, der DPF() bereitgestellt wurde, ist der Speicherort der Ausgabe für Artefakte wie Protokolle und Ergebnisse. Dies können verschiedene Stagingbereiche sein.
Überwachung, Ergebnisabruf, Fehlerbehandlung und Wiederherstellung funktionieren auf dieselbe Weise wie der DataFrame-Modus.
Legen Sie für die E/A-gebundene Dateiverarbeitung num_cpus_per_worker=1 in ExecutionOptions fest, um die Paralleleität zu maximieren (ein Akteur pro CPU). Verwenden Sie für CPU-gebundene Workloads Standardwert, oder erhöhen Sie num_cpus_per_worker.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
run = dpf.run_from_stage(
stage_location="@my_stage/data/",
run_id="io_bound_processing",
execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Konfigurieren von Ausführungsoptionen¶
Verwenden Sie ExecutionOptions zum Steuern der Ressourcenzuweisung und des Ausführungsverhaltens, wie z. B. CPU/GPU-Zuweisung pro Worker, Anzahl der Wiederholungsversuche und Fail-Fast-Verhalten. Alle Felder sind optional und haben sinnvolle Standardwerte.
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions
options = ExecutionOptions(
num_cpus_per_worker=4,
num_gpus_per_worker=1,
fail_fast=True,
)
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="my_run",
execution_options=options,
)
Eine vollständige Liste der Parameter und des Worker-Skalierungsverhaltens finden Sie in der ExecutionOptions API-Referenz.
Verwenden von Artefakten anhand des PartitionContext¶
Das PartitionContext-Objekt wird als zweites Argument an die Verarbeitungsfunktion übergeben. Es bietet Methoden für die Interaktion mit Artefakten und Snowflake-Sitzungen während der Ausführung von Partitionen.
Hochladen von Artefakten¶
Verwenden Sie upload_to_stage(), um Ergebnisse innerhalb Ihrer Verarbeitungsfunktion zu speichern. Standardmäßig werden Objekte mit „pickle“ serialisiert. Geben Sie eine write_function für die kundenspezifische Serialisierung an.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Save a pickle object (default serialization)
results = {'total': df['amount'].sum(), 'count': len(df)}
context.upload_to_stage(results, "summary.pkl")
# Save JSON data with a custom write function
import json
context.upload_to_stage(
results, "summary.json",
write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
)
# Save a CSV file
df_processed = df.groupby('category').sum()
context.upload_to_stage(
df_processed, "aggregated.csv",
write_function=lambda df, path: df.to_csv(path, index=False)
)
Herunterladen von Artefakten¶
Verwenden Sie download_from_stage(), um Artefakte innerhalb Ihrer Verarbeitungsfunktion zu laden. Sie können diese Funktion verwenden, um auf Artefakte aus einer früheren Ausführung zuzugreifen. Sie können damit zum Beispiel ein Modell für eine Inferenz laden.
def my_inference_function(data_connector, context):
# Load a pickle object from the current partition's stage path
model = context.download_from_stage("model.pkl")
# Load from a different stage path (e.g., from a prior training run)
model = context.download_from_stage(
"model.pkl",
stage_path="@db.schema.stage/training_run/partition_0"
)
# Load JSON with a custom deserializer
import json
config = context.download_from_stage(
"config.json",
read_function=lambda path: json.load(open(path, 'r'))
)
Verwenden von Snowflake-Sitzungen¶
Verwenden Sie with_session(), um Vorgänge auszuführen, die eine Snowflake-Sitzung erfordern, z. B. das Schreiben von Ergebnissen in eine Tabelle. Diese Methode verwendet einen begrenzten Sitzungspool, um zu vermeiden, dass die Sitzungslimits von Snowflake erreicht werden, wenn viele Partitionen gleichzeitig ausgeführt werden.
def my_function(data_connector, context):
df = data_connector.to_pandas()
# Load a model from a prior training run
model = context.download_from_stage("model.pkl")
predictions = model.predict(df[['X1', 'X2']])
results = df.copy()
results['predictions'] = predictions
results['partition_id'] = context.partition_id
# Write results to a Snowflake table using the bounded session pool
context.with_session(lambda session:
session.create_dataframe(results)
.write.mode("append")
.save_as_table("predictions_table")
)
Bemerkung
Die an with_session() übergebene Funktion wird mit „cloudpickle“ serialisiert. Vermeiden Sie es, große Objekte oder nicht serialisierbare Ressourcen in beim Abschluss zu erfassen.
Skalieren über mehrere Knoten¶
Um DPF über mehrere Knoten hinweg auszuführen, skalieren Sie Ihren Cluster, bevor Sie die Ausführung starten:
from snowflake.ml.runtime_cluster import scale_cluster
# Scale to 3 nodes for increased parallelism
scale_cluster(3)
dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
partition_by="region",
snowpark_dataframe=sales_data,
run_id="multi_node_run",
execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Wenn Sie die Ausführung auf mehreren Knoten läuft, legen Sie use_head_node=False fest, wenn der Hauptknoten nur als Koordinator fungieren soll, ohne Benutzerfunktionen auszuführen. Dies kann die Zuverlässigkeit von Workloads mit langer Ausführungszeit verbessern, da ein Out-of-Memory-Fehler auf einem Workerknoten keine Auswirkungen auf den Koordinator hat.
Be- und Einschränkungen¶
Eine gleichzeitige Ausführung: Es kann jeweils nur eine DPF ausgeführt werden. Das Starten einer neuen Ausführung, während eine andere bereits ausgeführt wird, löst einen Fehler aus. Brechen Sie die vorherige Ausführung mit
run.cancel()ab, bevor Sie eine neue starten.DataFrame-Anforderungen: Im DataFrame-Modus darf der Snowpark-DataFrame genau eine Abfrage und keine nachfolgenden Aktionen enthalten.
Beschränkung auf einen einzelnen Knoten:
use_head_node=Falsewird bei Clustern mit einem Knoten nicht unterstützt.Artefaktpfadstruktur: Artefakte werden unter
@{stage_name}/{run_id}/{partition_id}/gespeichert. Diese Pfadstruktur ist fest definiert und kann nicht angepasst werden.Wiederhergestellte Ausführungen sind schreibgeschützt: Ausführungen, die mit Runs
DPFRun.restore_from()wiederhergestellt wurden, können nichtwait()odercancel()aufrufen.
Nächste Schritte¶
Sehen Sie sich Trainieren von Modellen über Datenpartitionen hinweg an, um mehr über das Trainieren mehrerer ML-Modelle mit DPF als zugrunde liegende Infrastruktur zu erfahren
Die vollständige API-Dokumentation finden Sie in der Referenz zur Distributed Partition Function (DPF)-API.
Ende-zu-Ende-Beispiele finden Sie in den Snowflake ML-Beispiel-Notebooks.