Distribuição de cargas de trabalho que obtêm resultados com o Conector Snowflake para Python

Se você estiver usando um ambiente distribuído para paralelizar cargas de trabalho, pode usar o Conector Snowflake para Python para distribuir o trabalho de buscar e processar resultados.

Neste tópico:

Introdução

Após usar o objeto Cursor para executar uma consulta, você pode distribuir o trabalho de buscar resultados usando lotes de resultados. Um lote de resultados encapsula uma função que recupera um subconjunto dos resultados. Você pode designar trabalhadores diferentes para usar lotes de resultados diferentes para buscar e processar resultados em paralelo.

Recuperação da lista de lotes de resultados

Após executar uma consulta, você pode recuperar os resultados em um dos seguintes formatos:

  • Objetos ResultBatch.

    Para fazer isso, chame o método get_result_batches() no objeto Cursor. Isto retorna uma lista de objetos ResultBatch que você pode atribuir a diferentes trabalhadores para processamento. Por exemplo:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • Tabelas PyArrow

    Você pode usar os seguintes métodos para recuperar os lotes de resultados como tabelas PyArrow:

    • fetch_arrow_all(): Chame este método para retornar uma tabela PyArrow contendo todos os resultados.

    • fetch_arrow_batches(): Chame este método para retornar um iterador que você pode usar para retornar uma tabela PyArrow para cada lote de resultados.

    Por exemplo:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • objetos pandas DataFrame.

    Se você instalou a versão compatível com pandas do Conector Snowflake para Python, pode usar os seguintes métodos para recuperar os lotes de resultados como objetos Pandas DataFrame:

    • fetch_pandas_all(): chame este método para retornar um pandas DataFrame contendo todos os resultados.

    • fetch_pandas_batches(): chame este método para retornar um iterador que você pode usar para retornar um pandas DataFrame para cada lote de resultados.

    Por exemplo:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

Serialização de lotes de resultados

Para mover os lotes de resultados para outros trabalhadores ou nós, você pode serializar e desfazer a serialização de lotes de resultados. Por exemplo:

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

Como trabalhar com lotes de resultados

As próximas seções explicam como trabalhar com objetos ResultBatch:

Iteração em linhas em um lote de resultados

Com um objeto ResultBatch, você pode iterar as linhas que fazem parte desse lote. Por exemplo:

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

Quando você cria um iterador de um objeto ResultBatch, o objeto busca e converte o subconjunto de linhas para esse lote.

Materialização de linhas em um lote de resultados

Para materializar o subconjunto de linhas em um lote de resultados passando aquele objeto ResultBatch para a função list(). Por exemplo:

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

Obtenção do número de linhas e tamanho de um lote de resultados

Se você precisar determinar o número de linhas em um lote de resultados e o tamanho dos dados, pode usar os atributos rowcount, compressed_size e uncompressed_size do objeto ResultBatch. Por exemplo:

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

Observe que estes atributos estão disponíveis antes de você iterar o lote de resultados. Você não precisa buscar o subconjunto de linhas para o lote a fim de obter os valores destes atributos.

Conversão de um lote de resultados Arrow em uma tabela PyArrow ou Pandas DataFrame

Para converter um ArrowResultBatch em uma tabela PyArrow ou um pandas DataFrame, use os seguintes métodos:

Por exemplo:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy