Verteilen von Workloads, die Ergebnisse mit dem Snowflake-Konnektor für Python abrufen

Wenn Sie eine verteilte Umgebung verwenden, um Workloads zu parallelisieren, können Sie den Snowflake-Konnektor für Python verwenden, um das Abrufen und Verarbeiten von Ergebnissen zu verteilen.

Unter diesem Thema:

Einführung

Nachdem Sie mit dem Objekt Cursor eine Abfrage ausgeführt haben, können Sie das Abrufen der Ergebnisse mithilfe von Resultbatches verteilen. Ein Resultbatch kapselt eine Funktion, die eine Teilmenge der Ergebnisse abruft. Sie können verschiedenen Mitarbeitern unterschiedliche Resultbatches zuweisen, um Ergebnisse parallel abzurufen und zu verarbeiten.

Abrufen der Liste der Resultbatches

Nachdem Sie eine Abfrage ausgeführt haben, können Sie die Ergebnisse in einem der folgenden Formate abrufen:

  • ResultBatch-Objekte.

    Dazu rufen Sie die Methode get_result_batches() im Cursor-Objekt auf. Dies gibt eine Liste von ResultBatch-Objekten zurück, die Sie verschiedenen Mitarbeitern zur Bearbeitung zuweisen können.

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow-Tabellen

    Sie können die folgenden Methoden verwenden, um die Resultbatches als PyArrow-Tabellen abzurufen:

    • fetch_arrow_all(): Rufen Sie diese Methode auf, um eine PyArrow-Tabelle mit allen Ergebnissen zurückzugeben.

    • fetch_arrow_batches(): Rufen Sie diese Methode auf, um einen Iterator zurückzugeben, den Sie verwenden können, um eine PyArrow-Tabelle für jeden Resultbatch zurückzugeben.

    Beispiel:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • pandas-DataFrame-Objekte.

    Wenn Sie die pandas-kompatible Version des Snowflake-Konnektor für Python installiert haben, können Sie die folgenden Methoden verwenden, um die Resultbatches als pandas-DataFrame-Objekte abzurufen:

    • fetch_pandas_all(): Rufen Sie diese Methode auf, um einen pandas-DataFrame mit allen Ergebnissen zurückzugeben.

    • fetch_pandas_batches(): Rufen Sie diese Methode auf, um einen Iterator zurückzugeben, den Sie verwenden können, um einen pandas-DataFrame für jeden Resultbatch zurückzugeben.

    Beispiel:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

Serialisieren von Resultbatches

Um die Resultbatches auf andere Worker oder Knoten zu verschieben, können Sie die Resultbatches serialisieren und deserialisieren. Beispiel:

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

Verwenden von Resultbatches

In den nächsten Abschnitten wird erklärt, wie ResultBatch-Objekte verwendet werden:

Iteration über Zeilen in einem Resultbatch

Mit einem ResultBatch-Objekt können Sie über die Zeilen iterieren, die zu diesem Batch gehören. Beispiel:

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

Wenn Sie einen Iterator für ein ResultBatch-Objekt erstellen, ruft das Objekt die Teilmenge der Zeilen dieses Batches ab und konvertiert sie.

Materialisieren der Zeilen in einem Resultbatch

Die Teilmenge von Zeilen in einem Resultbatch wird durch Übergabe dieses ResultBatch-Objekts an die Funktion list() materialisiert. Beispiel:

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

Abrufen der Anzahl der Zeilen und der Größe eines Resultbatches

Wenn Sie die Anzahl der Zeilen in einem Resultbatch und die Größe der Daten bestimmen müssen, können Sie die Attribute rowcount, compressed_size und uncompressed_size des ResultBatch-Objekts verwenden. Beispiel:

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

Beachten Sie, dass diese Attribute verfügbar sind, bevor Sie über den Resultbatch iterieren. Um die Werte dieser Attribute zu erhalten, müssen Sie die Teilmenge der Zeilen für den Batch nicht extra abrufen.

Konvertieren eines Arrow-Resultbatches in eine PyArrow-Tabelle oder einen pandas-DataFrame

Um einen ArrowResultBatch in eine PyArrow-Tabelle oder einen pandas-DataFrame zu konvertieren, verwenden Sie die folgenden Methoden:

Beispiel:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy