Distribution de charges de travail qui récupèrent des résultats avec le connecteur Snowflake pour Python

Si vous utilisez un environnement distribué pour paralléliser les charges de travail, vous pouvez utiliser le connecteur Snowflake pour Python pour répartir le travail d’extraction et de traitement des résultats.

Dans ce chapitre :

Introduction

Après avoir utilisé l’objet Curseur pour exécuter une requête, vous pouvez répartir le travail de récupération des résultats en utilisant des lots de résultats. Un lot de résultats encapsule une fonction qui récupère un sous-ensemble de résultats. Vous pouvez affecter différents employés à l’utilisation de différents lots de résultats pour récupérer et traiter les résultats en parallèle.

Récupération de la liste des lots de résultats

Après avoir exécuté une requête, vous pouvez récupérer les résultats dans l’un des formats suivants :

  • Objets ResultBatch.

    Pour ce faire, appelez la méthode obtenir_lot_résultats() dans l’objet Curseur. Cela renvoie une liste d’objets ResultBatch que vous pouvez affecter à différents employés pour le traitement. Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
  • Tables PyArrow

    Vous pouvez utiliser les méthodes suivantes pour récupérer les lots de résultats en tant que tables PyArrow :

    • récupérer_arrow_tout() : appelez cette méthode pour renvoyer une table PyArrow contenant tous les résultats.

    • récupérer_arrow_lots() : appelez cette méthode pour renvoyer un itérateur que vous pouvez utiliser pour renvoyer une table PyArrow pour chaque lot de résultats.

    Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
  • Objets DataFrame Pandas.

    Si vous avez installé la version compatible avec Pandas du connecteur Snowflake pour Python, vous pouvez utiliser les méthodes suivantes pour récupérer les lots de résultats sous forme d’objets DataFrame Pandas :

    • récupérer_pandas_tout() : appelez cette méthode pour renvoyer un DataFrame Pandas contenant tous les résultats.

    • récupérer_pandas_lots() : appelez cette méthode pour renvoyer un itérateur que vous pouvez utiliser pour renvoyer un DataFrame Pandas pour chaque lot de résultats.

    Par exemple :

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a Pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of Pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    

Sérialisation des lots de résultats

Pour déplacer les lots de résultats vers d’autres employés ou nœuds, vous pouvez sérialiser et désérialiser les lots de résultats. Par exemple :

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)

Utilisation de lots de résultats

Les sections suivantes expliquent comment travailler avec les objets ResultBatch :

Itération sur les lignes d’un lot de résultats

Avec un objet ResultBatch, vous pouvez itérer sur les lignes qui font partie de ce lot. Par exemple :

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)

Lorsque vous créez un itérateur d’un objet ResultBatch, l’objet récupère et convertit le sous-ensemble de lignes pour ce lot.

Matérialiser les lignes d’un lot de résultats

Pour matérialiser le sous-ensemble de lignes dans un lot de résultats en passant cet objet ResultBatch à la fonction list(). Par exemple :

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)

Obtenir le nombre de lignes et la taille d’un lot de résultats

Si vous devez déterminer le nombre de lignes dans un lot de résultats et la taille des données, vous pouvez utiliser les attributs rowcount, compressed_size et uncompressed_size de l’objet ResultBatch. Par exemple :

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size

Notez que ces attributs sont disponibles avant que vous n’itériez sur le lot de résultats. Il n’est pas nécessaire de récupérer le sous-ensemble de lignes pour le lot afin d’obtenir les valeurs de ces attributs.

Conversion d’un lot de résultats Arrow en une table PyArrow ou un DataFrame Pandas

Pour convertir un ArrowResultBatch en une table PyArrow ou un DataFrame Pandas, utilisez les méthodes suivantes :

Par exemple :

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a Pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Revenir au début