Verarbeiten von Daten mit benutzerdefinierter Logik über Partitionen hinweg

Verwenden Sie DPF (Distributed Partition Function), um Daten parallel über einen oder mehrere Knoten in einem Computepool zu verarbeiten. DPF handhabt die verteilte Orchestrierung, Fehler, Beobachtbarkeit und die Persistenz von Artefakten automatisch. Sie können DPF entweder in einem Snowflake Notebook oder einem Snowflake ML-Job ausführen.

DPF unterstützt die folgenden Ausführungsmodi:

  • DataFrame-Modus (run()): Partitioniert einen Snowpark-DataFrame nach Spaltenwerten und führt die Funktion für jede Partition gleichzeitig aus. Die Daten werden parallel für einen optimalen Durchsatz vorab abgerufen.

  • Stagingbereichsmodus (run_from_stage()): Verarbeitet Dateien aus einem Snowflake-Stagingbereich, in dem jede Datei zu einer Partition wird. Ideal für die Verarbeitung von Dateien im großen Umfang mit vorhersehbarer Speichernutzung.

Sie können DPF zur effizienten Verarbeitung großer Datensets über verschiedene Datensegmente hinweg verwenden.

Dieses Tool ist ideal für Szenarien wie die folgenden:

  • Analysieren von Verkaufsdaten nach Regionen

  • Verarbeiten von Kundendaten nach geografischen Segmenten

  • Trainieren von ML -Modellen für jede Datenpartition

  • Durchführen von Datentransformationen, bei denen jede Datenpartition dieselbe Verarbeitungslogik erfordert

DPF verarbeitet die verteilte Datenverarbeitung automatisch. Sie müssen die verteilte Computing-Infrastruktur nicht verwalten.

Mit DPF können Sie kundenspezifischen Python-Code unter Verwendung von Open-Source-Bibliotheken in einer containerisierten Infrastruktur mit GPU-Zugriff schreiben.

Wichtig

DPF speichert Ergebnisse und Artefakte automatisch in Snowflake-Stagingbereichen. Bevor Sie DPF verwenden, stellen Sie sicher, dass Sie über Berechtigungen für den Stagingbereich verfügen, in dem DPF Ergebnisse und Artefakte speichert.

DataFrame-Modus: Verarbeiten von Daten nach Spaltenpartitionen

Verwenden Sie den DataFrame-Modus, um den Snowpark-DataFrame anhand einer bestimmten Spalte zu partionieren und die Python-Funktion auf jeder Partition parallel auszuführen. Das folgende Beispiel zeigt die Verarbeitung von Umsatzdaten nach Region.

  1. Die Verarbeitungsfunktion definieren

  2. DPF initialisieren und ausführen

  3. Fortschritt überwachen und auf Abschluss warten

  4. Abrufen der Ergebnisse von jeder Partition

  5. Fehler verarbeiten

  6. Ergebnisse von der abgeschlossenen Ausführung wiederherstellen

Die Verarbeitungsfunktion definieren

Importieren Sie die Klassen, die für die verteilte Verarbeitung erforderlich sind:

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf import DPF
from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun
from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, ExecutionOptions
)
Copy

Definieren Sie eine Verarbeitungsfunktion, die zwei Argumente annimmt:

  • data_connector: Ein DataConnector, der Zugriff auf die Daten der Partition hat. Rufen Sie data_connector.to_pandas() auf, um ihn als pandas-DataFrame zu laden, oder verwenden Sie Methoden wie to_torch_dataset() oder to_ray_dataset().

  • context: Ein PartitionContext-Objekt, das die Partitions-ID und Methoden zum Hoch- und Herunterladen von Artefakten bereitstellt.

import json

def process_sales_data(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing {len(df)} records for region: {context.partition_id}")

    # Perform region-specific analytics
    summary = {
        'region': context.partition_id,
        'total_sales': df['amount'].sum(),
        'avg_order_value': df['amount'].mean(),
        'customer_count': df['customer_id'].nunique(),
        'record_count': len(df)
    }

    # Store results in stage for subsequent access

    context.upload_to_stage(summary, "sales_summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

Für jede Region berechnet diese Funktion zusammenfassende Statistiken und speichert die Ergebnisse als JSON-Datei im dedizierten Stagingbereichsverzeichnis der Partition.

Initialisieren und Ausführen von DPF

Erstellen Sie eine DPF-Instanz mit Ihrer Verarbeitungsfunktion und einen Namen für den Ausgabe-Stagingbereich, und rufen Sie dann run() auf, um die verteilte Verarbeitung zu starten.

Wichtig

Der von Ihnen bereitgestellte Snowpark-DataFrame muss aus einer Tabelle erstellt werden. Weitere Informationen zum Erstellen eines DataFrame aus einer Tabelle finden Sie unter Erstellen eines DataFrame.

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="regional_analytics_2024"
)
Copy

Die Methode run() akzeptiert die folgenden Parameter:

  • partition_by (str): Name der Spalte, nach der der DataFrame partitioniert werden soll. Jeder eindeutige Wert erstellt eine eigene Partition.

  • snowpark_dataframe: Der Snowpark-DataFrame, der partitioniert und verarbeitet werden soll.

  • run_id (str): Eindeutiger Bezeichner für diese Ausführung. Erstellt ein dediziertes @{stage_name}/{run_id}/-Verzeichnis für alle Artefakte. Verwenden Sie beschreibende Namen wie experiment_2024_01_15 oder model_v1_retrain.

  • on_existing_artifacts (str, optional): Aktion, wenn Artefakte für die run_id bereits vorhanden sind. "error" (Standard) löst einen Fehler aus, "overwrite" ersetzt vorhandene Artefakte.

  • execution_options (ExecutionOptions, optional): Konfiguration der Ressourcenzuweisung und des Ausführungsverhaltens.

Fortschritt überwachen und auf Abschluss warten

Rufen Sie wait() auf, um bis zum Abschluss der Ausführung zu blockieren. Standardmäßig wird ein Fortschrittsbalken angezeigt.

final_status = run.wait()  # Shows progress bar by default
print(f"Job completed with status: {final_status}")
Copy

Die Ausgabe könnte beispielsweise wie folgt aussehen:

Progress: 100%|██████████| 4/4 [02:15<00:00, 33.75s/it]
Job completed with status: RunStatus.SUCCESS

Sie können den Status und den Fortschritt auch jederzeit überprüfen, ohne zu blockieren:

# Check overall status
current_status = run.status

# Get progress grouped by partition status
progress = run.get_progress()
Copy

Abrufen der Ergebnisse von jeder Partition

Nachdem die Ausführung erfolgreich abgeschlossen wurde, rufen Sie die Ergebnisse jeder Partition mit der partition_details-Eigenschaft ab. Die Details jeder Partition enthalten einen stage_artifacts_manager für den Zugriff auf gespeicherte Artefakte.

if final_status == RunStatus.SUCCESS:
    import json
    all_results = []
    for partition_id, details in run.partition_details.items():
        # List available artifacts for this partition
        files = details.stage_artifacts_manager.list()
        print(f"Partition {partition_id} artifacts: {files}")

        # Load an artifact using a custom deserializer
        summary = details.stage_artifacts_manager.get("sales_summary.json",
            read_function=lambda path: json.load(open(path, 'r')))
        all_results.append(summary)

    # Combine results across all regions
    total_sales = sum(r['total_sales'] for r in all_results)
    total_customers = sum(r['customer_count'] for r in all_results)
Copy

Der stage_artifacts_manager bietet drei Methoden:

  • list(): Gibt eine Liste der Dateinamen zurück, die im Stagingbereichsverzeichnis der Partition gespeichert sind.

  • get(filename, read_function=None): Lädt ein Artefakt herunter und deserialisiert es. Verwendet standardmäßig pickle oder eine kundenspezifische read_function, falls angegeben.

  • download(filename, local_dir): Lädt eine Rohdatei in ein lokales Verzeichnis herunter und gibt den lokalen Dateipfad zurück.

Verarbeiten von Fehlern

Wenn die Ausführung nicht erfolgreich ist, überprüfen Sie die Details der einzelnen Partitionen, um Fehler zu diagnostizieren:

if final_status != RunStatus.SUCCESS:
    for partition_id, details in run.partition_details.items():
        if details.status != PartitionStatus.DONE:
            print(f"Partition {partition_id} status: {details.status}")
            try:
                error_logs = details.logs
                print(error_logs)
            except RuntimeError:
                print("Logs not available for this partition")
Copy

Der Gesamt-RunStatus spiegelt das aggregierte Ergebnis wider:

  • SUCCESS: Alle Partitionen wurden erfolgreich abgeschlossen.

  • PARTIAL_FAILURE: Einige Partitionen wurden erfolgreich abgeschlossen, aber mindestens eine ist fehlgeschlagen.

  • FAILURE: Es wurden keine Partitionen erfolgreich abgeschlossen.

  • CANCELLED: Die Ausführung wurde abgebrochen.

  • IN_PROGRESS: Die Ausführung läuft noch.

Jede Partition hat einen PartitionStatus:

  • PENDING: Wartet auf die Bearbeitung.

  • RUNNING: Wird derzeit bearbeitet.

  • DONE: Wurde erfolgreich abgeschlossen.

  • FAILED: Die Benutzerfunktion hat eine Ausnahme ausgelöst.

  • CANCELLED: Vom Benutzenden storniert.

  • INTERNAL_ERROR: Es ist ein interner Systemfehler aufgetreten (z. B. Out-of-Memory).

So importieren Sie diese Enumerationen:

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import (
    RunStatus, PartitionStatus
)
Copy

So brechen Sie einen laufenden Job ab:

run.cancel()
Copy

Bemerkung

Bereits abgeschlossene Partitionen sind von dem Abbruch nicht betroffen. Teilergebnisse, Protokolle und Artefakte von abgeschlossenen Partitionen bleiben verfügbar.

Wiederherstellen der Ergebnisse einer abgeschlossenen Ausführung

Sie können eine abgeschlossene Ausführung aus ihrem persistierten Zustand wieder herstellen und auf dieselben Ergebnisse zugreifen, ohne den Prozess erneut ausführen zu müssen:

from snowflake.ml.modeling.distributors.distributed_partition_function.dpf_run import DPFRun

restored_run = DPFRun.restore_from(run_id="regional_analytics_2024", stage_name="analytics_stage")

# Access results just like the original run
for partition_id, details in restored_run.partition_details.items():
    print(f"{partition_id}: {details.status}")
Copy

Bemerkung

Wiederhergestellte Ausführungen sind schreibgeschützt. Sie können bei einer wiederhergestellten Ausführung nicht wait() oder cancel() aufrufen.

Stagingbereichsmodus: Verarbeiten von Dateien aus einem Stagingbereich

Verwenden Sie den Stagingbereichsmodus, um Dateien aus einem Snowflake-Stagingbereich zu verarbeiten, in dem jede Datei zu einer Partition wird. Dies ist ideal für die Dateiverarbeitung im großen Umfang, z. B. die Verarbeitung einer Sammlung von Parquet-Dateien, die im Stagingbereich bereitgestellt wurden.

Definieren einer Verarbeitungsfunktion

Die Signatur der Verarbeitungsfunktion ist dieselbe wie der DataFrame-Modus. Der data_connector bietet Zugriff auf die Daten der Datei, und context.partition_id ist der relative Dateipfad innerhalb des Stagingbereichs.

def process_file(data_connector, context):
    df = data_connector.to_pandas()
    print(f"Processing file {context.partition_id}: {len(df)} rows")

    # Process data and save results
    result = {"row_count": len(df), "columns": list(df.columns)}
    import json
    context.upload_to_stage(result, "result.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w')))
Copy

DPF vom Stagingbereich aus ausführen

Rufen Sie run_from_stage() anstelle von run() auf. Geben Sie den Eingabe-stage_location an, der die Quelldateien enthält, sowie optional ein file_pattern, um nach den Dateien zu filtern, die verarbeitet werden sollen.

dpf = DPF(process_file, "output_stage")
run = dpf.run_from_stage(
    stage_location="@my_db.my_schema.input_stage/data/",
    run_id="file_processing_2024",
    file_pattern="*.parquet",
)
final_status = run.wait()
Copy

Die Methode run_from_stage() akzeptiert die folgenden Parameter:

  • stage_location (str): Pfad zum Eingabe-Stagingbereich, der die Quelldatendateien enthält. Jede Datei, die mit dem file_pattern übereinstimmt, wird zu einer Partition. Unterstützt sowohl einfache als auch vollqualifizierte Stagingbereichsnamen:

    • Einfach: "@my_stage/data/"

    • Vollqualifiziert: "@my_db.my_schema.my_stage/data/"

  • run_id (str): Eindeutiger Bezeichner für diese Ausführung.

  • file_pattern (str, optional): Glob-Muster zum Filtern von Dateien. Die Standardeinstellung ist "*.parquet".

  • on_existing_artifacts (str, optional): "error" (Standard) oder "overwrite".

  • execution_options (ExecutionOptions, optional): Konfiguration der Ressourcenzuweisung und des Ausführungsverhaltens.

Bemerkung

stage_location ist die Eingabe-Datenquelle. stage_name, der DPF() bereitgestellt wurde, ist der Speicherort der Ausgabe für Artefakte wie Protokolle und Ergebnisse. Dies können verschiedene Stagingbereiche sein.

Überwachung, Ergebnisabruf, Fehlerbehandlung und Wiederherstellung funktionieren auf dieselbe Weise wie der DataFrame-Modus.

Legen Sie für die E/A-gebundene Dateiverarbeitung num_cpus_per_worker=1 in ExecutionOptions fest, um die Paralleleität zu maximieren (ein Akteur pro CPU). Verwenden Sie für CPU-gebundene Workloads Standardwert, oder erhöhen Sie num_cpus_per_worker.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

run = dpf.run_from_stage(
    stage_location="@my_stage/data/",
    run_id="io_bound_processing",
    execution_options=ExecutionOptions(num_cpus_per_worker=1),
)
Copy

Konfigurieren von Ausführungsoptionen

Verwenden Sie ExecutionOptions zum Steuern der Ressourcenzuweisung und des Ausführungsverhaltens, wie z. B. CPU/GPU-Zuweisung pro Worker, Anzahl der Wiederholungsversuche und Fail-Fast-Verhalten. Alle Felder sind optional und haben sinnvolle Standardwerte.

from snowflake.ml.modeling.distributors.distributed_partition_function.entities import ExecutionOptions

options = ExecutionOptions(
    num_cpus_per_worker=4,
    num_gpus_per_worker=1,
    fail_fast=True,
)

run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="my_run",
    execution_options=options,
)
Copy

Eine vollständige Liste der Parameter und des Worker-Skalierungsverhaltens finden Sie in der ExecutionOptions API-Referenz.

Verwenden von Artefakten anhand des PartitionContext

Das PartitionContext-Objekt wird als zweites Argument an die Verarbeitungsfunktion übergeben. Es bietet Methoden für die Interaktion mit Artefakten und Snowflake-Sitzungen während der Ausführung von Partitionen.

Hochladen von Artefakten

Verwenden Sie upload_to_stage(), um Ergebnisse innerhalb Ihrer Verarbeitungsfunktion zu speichern. Standardmäßig werden Objekte mit „pickle“ serialisiert. Geben Sie eine write_function für die kundenspezifische Serialisierung an.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Save a pickle object (default serialization)
    results = {'total': df['amount'].sum(), 'count': len(df)}
    context.upload_to_stage(results, "summary.pkl")

    # Save JSON data with a custom write function
    import json
    context.upload_to_stage(
        results, "summary.json",
        write_function=lambda obj, path: json.dump(obj, open(path, 'w'))
    )

    # Save a CSV file
    df_processed = df.groupby('category').sum()
    context.upload_to_stage(
        df_processed, "aggregated.csv",
        write_function=lambda df, path: df.to_csv(path, index=False)
    )
Copy

Herunterladen von Artefakten

Verwenden Sie download_from_stage(), um Artefakte innerhalb Ihrer Verarbeitungsfunktion zu laden. Sie können diese Funktion verwenden, um auf Artefakte aus einer früheren Ausführung zuzugreifen. Sie können damit zum Beispiel ein Modell für eine Inferenz laden.

def my_inference_function(data_connector, context):
    # Load a pickle object from the current partition's stage path
    model = context.download_from_stage("model.pkl")

    # Load from a different stage path (e.g., from a prior training run)
    model = context.download_from_stage(
        "model.pkl",
        stage_path="@db.schema.stage/training_run/partition_0"
    )

    # Load JSON with a custom deserializer
    import json
    config = context.download_from_stage(
        "config.json",
        read_function=lambda path: json.load(open(path, 'r'))
    )
Copy

Verwenden von Snowflake-Sitzungen

Verwenden Sie with_session(), um Vorgänge auszuführen, die eine Snowflake-Sitzung erfordern, z. B. das Schreiben von Ergebnissen in eine Tabelle. Diese Methode verwendet einen begrenzten Sitzungspool, um zu vermeiden, dass die Sitzungslimits von Snowflake erreicht werden, wenn viele Partitionen gleichzeitig ausgeführt werden.

def my_function(data_connector, context):
    df = data_connector.to_pandas()

    # Load a model from a prior training run
    model = context.download_from_stage("model.pkl")

    predictions = model.predict(df[['X1', 'X2']])

    results = df.copy()
    results['predictions'] = predictions
    results['partition_id'] = context.partition_id

    # Write results to a Snowflake table using the bounded session pool
    context.with_session(lambda session:
        session.create_dataframe(results)
            .write.mode("append")
            .save_as_table("predictions_table")
    )
Copy

Bemerkung

Die an with_session() übergebene Funktion wird mit „cloudpickle“ serialisiert. Vermeiden Sie es, große Objekte oder nicht serialisierbare Ressourcen in beim Abschluss zu erfassen.

Skalieren über mehrere Knoten

Um DPF über mehrere Knoten hinweg auszuführen, skalieren Sie Ihren Cluster, bevor Sie die Ausführung starten:

from snowflake.ml.runtime_cluster import scale_cluster

# Scale to 3 nodes for increased parallelism
scale_cluster(3)

dpf = DPF(process_sales_data, "analytics_stage")
run = dpf.run(
    partition_by="region",
    snowpark_dataframe=sales_data,
    run_id="multi_node_run",
    execution_options=ExecutionOptions(use_head_node=True),
)
final_status = run.wait()
Copy

Wenn Sie die Ausführung auf mehreren Knoten läuft, legen Sie use_head_node=False fest, wenn der Hauptknoten nur als Koordinator fungieren soll, ohne Benutzerfunktionen auszuführen. Dies kann die Zuverlässigkeit von Workloads mit langer Ausführungszeit verbessern, da ein Out-of-Memory-Fehler auf einem Workerknoten keine Auswirkungen auf den Koordinator hat.

Be- und Einschränkungen

  • Eine gleichzeitige Ausführung: Es kann jeweils nur eine DPF ausgeführt werden. Das Starten einer neuen Ausführung, während eine andere bereits ausgeführt wird, löst einen Fehler aus. Brechen Sie die vorherige Ausführung mit run.cancel() ab, bevor Sie eine neue starten.

  • DataFrame-Anforderungen: Im DataFrame-Modus darf der Snowpark-DataFrame genau eine Abfrage und keine nachfolgenden Aktionen enthalten.

  • Beschränkung auf einen einzelnen Knoten: use_head_node=False wird bei Clustern mit einem Knoten nicht unterstützt.

  • Artefaktpfadstruktur: Artefakte werden unter @{stage_name}/{run_id}/{partition_id}/ gespeichert. Diese Pfadstruktur ist fest definiert und kann nicht angepasst werden.

  • Wiederhergestellte Ausführungen sind schreibgeschützt: Ausführungen, die mit Runs DPFRun.restore_from() wiederhergestellt wurden, können nicht wait() oder cancel() aufrufen.

Nächste Schritte