Distribuição de cargas de trabalho que obtêm resultados com o Conector Snowflake para Python¶
Se você estiver usando um ambiente distribuído para paralelizar cargas de trabalho, pode usar o Conector Snowflake para Python para distribuir o trabalho de buscar e processar resultados.
Neste tópico:
Introdução¶
Após usar o objeto Cursor
para executar uma consulta, você pode distribuir o trabalho de buscar resultados usando lotes de resultados. Um lote de resultados encapsula uma função que recupera um subconjunto dos resultados. Você pode designar trabalhadores diferentes para usar lotes de resultados diferentes para buscar e processar resultados em paralelo.
Recuperação da lista de lotes de resultados¶
Após executar uma consulta, você pode recuperar os resultados em um dos seguintes formatos:
Objetos ResultBatch.
Para fazer isso, chame o método
get_result_batches()
no objeto Cursor. Isto retorna uma lista de objetosResultBatch
que você pode atribuir a diferentes trabalhadores para processamento. Por exemplo:with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Get the list of result batches result_batch_list = cur.get_result_batches() # Get the number of result batches in the list. num_result_batches = len(result_batch_list) # Split the list of result batches into two # to distribute the work of fetching results # between two workers. result_batch_list_1 = result_batch_list[:: 2] result_batch_list_2 = result_batch_list[1 :: 2]
-
Você pode usar os seguintes métodos para recuperar os lotes de resultados como tabelas PyArrow:
fetch_arrow_all()
: Chame este método para retornar uma tabela PyArrow contendo todos os resultados.fetch_arrow_batches()
: Chame este método para retornar um iterador que você pode usar para retornar uma tabela PyArrow para cada lote de resultados.
Por exemplo:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a PyArrow table containing all of the results. table = cur.fetch_arrow_all() # Iterate over a list of PyArrow tables for result batches. for table_for_batch in cur.fetch_arrow_batches(): my_pyarrow_table_processing_function(table_for_batch)
objetos pandas DataFrame.
Se você instalou a versão compatível com pandas do Conector Snowflake para Python, pode usar os seguintes métodos para recuperar os lotes de resultados como objetos Pandas DataFrame:
fetch_pandas_all()
: chame este método para retornar um pandas DataFrame contendo todos os resultados.fetch_pandas_batches()
: chame este método para retornar um iterador que você pode usar para retornar um pandas DataFrame para cada lote de resultados.
Por exemplo:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a pandas DataFrame containing all of the results. table = cur.fetch_pandas_all() # Iterate over a list of pandas DataFrames for result batches. for dataframe_for_batch in cur.fetch_pandas_batches(): my_dataframe_processing_function(dataframe_for_batch)
Serialização de lotes de resultados¶
Para mover os lotes de resultados para outros trabalhadores ou nós, você pode serializar e desfazer a serialização de lotes de resultados. Por exemplo:
import pickle
# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])
# At this point, you can move the serialized data to
# another worker/node.
...
# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Como trabalhar com lotes de resultados¶
As próximas seções explicam como trabalhar com objetos ResultBatch:
Obtenção do número de linhas e tamanho de um lote de resultados
Conversão de um lote de resultados Arrow em uma tabela PyArrow ou Pandas DataFrame
Iteração em linhas em um lote de resultados¶
Com um objeto ResultBatch
, você pode iterar as linhas que fazem parte desse lote. Por exemplo:
# Iterate over the list of result batches.
for batch in result_batch_list_1:
# Iterate over the subset of rows in a result batch.
for row in batch:
print(row)
Quando você cria um iterador de um objeto ResultBatch
, o objeto busca e converte o subconjunto de linhas para esse lote.
Materialização de linhas em um lote de resultados¶
Para materializar o subconjunto de linhas em um lote de resultados passando aquele objeto ResultBatch
para a função list()
. Por exemplo:
# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Obtenção do número de linhas e tamanho de um lote de resultados¶
Se você precisar determinar o número de linhas em um lote de resultados e o tamanho dos dados, pode usar os atributos rowcount, compressed_size e uncompressed_size do objeto ResultBatch
. Por exemplo:
# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount
# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Observe que estes atributos estão disponíveis antes de você iterar o lote de resultados. Você não precisa buscar o subconjunto de linhas para o lote a fim de obter os valores destes atributos.
Conversão de um lote de resultados Arrow em uma tabela PyArrow ou Pandas DataFrame¶
Para converter um ArrowResultBatch
em uma tabela PyArrow ou um pandas DataFrame, use os seguintes métodos:
to_pandas()
: chame este método para retornar um pandas DataFrame contendo as linhas em umArrowResultBatch
, se você tiver instalado a versão compatível com Pandas do Conector Snowflake para Python.to_arrow()
: Chame este método para retornar uma tabela PyArrow contendo as linhas em umResultBatch
.
Por exemplo:
with conn_cnx as con:
with con.cursor() as cur:
cur.execute("select col1 from table")
batches = cur.get_result_batches()
# Get the row from the ResultBatch as a pandas DataFrame.
dataframe = batches[0].to_pandas()
# Get the row from the ResultBatch as a PyArrow table.
table = batches[0].to_arrow()