Distribution de charges de travail qui récupèrent des résultats avec le connecteur Snowflake pour Python¶
Si vous utilisez un environnement distribué pour paralléliser les charges de travail, vous pouvez utiliser le connecteur Snowflake pour Python pour répartir le travail d’extraction et de traitement des résultats.
Dans ce chapitre :
Introduction¶
Après avoir utilisé l’objet Curseur
pour exécuter une requête, vous pouvez répartir le travail de récupération des résultats en utilisant des lots de résultats. Un lot de résultats encapsule une fonction qui récupère un sous-ensemble de résultats. Vous pouvez affecter différents employés à l’utilisation de différents lots de résultats pour récupérer et traiter les résultats en parallèle.
Récupération de la liste des lots de résultats¶
Après avoir exécuté une requête, vous pouvez récupérer les résultats dans l’un des formats suivants :
Objets ResultBatch.
Pour ce faire, appelez la méthode
obtenir_lot_résultats()
dans l’objet Curseur. Cela renvoie une liste d’objetsResultBatch
que vous pouvez affecter à différents employés pour le traitement. Par exemple :with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Get the list of result batches result_batch_list = cur.get_result_batches() # Get the number of result batches in the list. num_result_batches = len(result_batch_list) # Split the list of result batches into two # to distribute the work of fetching results # between two workers. result_batch_list_1 = result_batch_list[:: 2] result_batch_list_2 = result_batch_list[1 :: 2]
-
Vous pouvez utiliser les méthodes suivantes pour récupérer les lots de résultats en tant que tables PyArrow :
récupérer_arrow_tout()
: appelez cette méthode pour renvoyer une table PyArrow contenant tous les résultats.récupérer_arrow_lots()
: appelez cette méthode pour renvoyer un itérateur que vous pouvez utiliser pour renvoyer une table PyArrow pour chaque lot de résultats.
Par exemple :
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a PyArrow table containing all of the results. table = cur.fetch_arrow_all() # Iterate over a list of PyArrow tables for result batches. for table_for_batch in cur.fetch_arrow_batches(): my_pyarrow_table_processing_function(table_for_batch)
-
Si vous avez installé la version compatible avec Pandas du connecteur Snowflake pour Python, vous pouvez utiliser les méthodes suivantes pour récupérer les lots de résultats sous forme d’objets DataFrame Pandas :
fetch_pandas_all()
: appelez cette méthode pour renvoyer un DataFrame Pandas contenant tous les résultats.fetch_pandas_batches()
: appelez cette méthode pour renvoyer un itérateur que vous pouvez utiliser pour renvoyer un DataFrame Pandas pour chaque lot de résultats.
Par exemple :
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a pandas DataFrame containing all of the results. table = cur.fetch_pandas_all() # Iterate over a list of pandas DataFrames for result batches. for dataframe_for_batch in cur.fetch_pandas_batches(): my_dataframe_processing_function(dataframe_for_batch)
Sérialisation des lots de résultats¶
Pour déplacer les lots de résultats vers d’autres employés ou nœuds, vous pouvez sérialiser et désérialiser les lots de résultats. Exemple :
import pickle
# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])
# At this point, you can move the serialized data to
# another worker/node.
...
# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Utilisation de lots de résultats¶
Les sections suivantes expliquent comment travailler avec les objets ResultBatch :
Obtention du nombre de lignes et la taille d’un lot de résultats
Conversion d’un lot de résultats Arrow en une table PyArrow ou un DataFrame Pandas
Itération sur les lignes d’un lot de résultats¶
Avec un objet ResultBatch
, vous pouvez itérer sur les lignes qui font partie de ce lot. Par exemple :
# Iterate over the list of result batches.
for batch in result_batch_list_1:
# Iterate over the subset of rows in a result batch.
for row in batch:
print(row)
Lorsque vous créez un itérateur d’un objet ResultBatch
, l’objet récupère et convertit le sous-ensemble de lignes pour ce lot.
Matérialisation des lignes d’un lot de résultats¶
Pour matérialiser le sous-ensemble de lignes dans un lot de résultats en passant cet objet ResultBatch
à la fonction list()
. Par exemple :
# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Obtention du nombre de lignes et la taille d’un lot de résultats¶
Si vous devez déterminer le nombre de lignes dans un lot de résultats et la taille des données, vous pouvez utiliser les attributs rowcount, compressed_size et uncompressed_size de l’objet ResultBatch
. Par exemple :
# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount
# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Notez que ces attributs sont disponibles avant que vous n’itériez sur le lot de résultats. Il n’est pas nécessaire de récupérer le sous-ensemble de lignes pour le lot afin d’obtenir les valeurs de ces attributs.
Conversion d’un lot de résultats Arrow en une table PyArrow ou un DataFrame Pandas¶
Pour convertir un ArrowResultBatch
en une table PyArrow ou un DataFrame Pandas, utilisez les méthodes suivantes :
to_pandas()
: appelez cette méthode pour renvoyer un DataFrame Pandas contenant les lignes d’unArrowResultBatch
, si vous avez installé la version compatible avec Pandas du connecteur Snowflake pour Python.vers_arrow()
: appelez cette méthode pour retourner une table PyArrow contenant les lignes d’unResultBatch
.
Par exemple :
with conn_cnx as con:
with con.cursor() as cur:
cur.execute("select col1 from table")
batches = cur.get_result_batches()
# Get the row from the ResultBatch as a pandas DataFrame.
dataframe = batches[0].to_pandas()
# Get the row from the ResultBatch as a PyArrow table.
table = batches[0].to_arrow()