Distributing workloads that fetch results with the Snowflake Connector for Python

If you are using a distributed environment to parallelize workloads, you can use the Snowflake Connector for Python to distribute the work of fetching and processing results.

Introduction

After you use the Cursor object to execute a query, you can distribute the work of fetching the results by using result batches. A result batch encapsulates a function that retrieves a subset of the results. You can assign different workers to use different result batches to fetch and process results in parallel.

Retrieving the list of result batches

After executing a query, you can retrieve the results in one of the following formats:

  • ResultBatch objects.

    To do this, call the get_result_batches() method in the Cursor object. This returns a list of ResultBatch objects that you can assign to different workers for processing. For example:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow tables

    You can use the following methods to retrieve the result batches as PyArrow tables:

    • fetch_arrow_all(): Call this method to return a PyArrow table containing all of the results.

    • fetch_arrow_batches(): Call this method to return an iterator that you can use to return a PyArrow table for each result batch.

    For example:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • pandas DataFrame objects.

    If you have installed the pandas-compatible version of the Snowflake Connector for Python, you can use the following methods to retrieve the result batches as pandas DataFrame objects:

    • fetch_pandas_all(): Call this method to return a pandas DataFrame containing all of the results.

    • fetch_pandas_batches(): Call this method to return an iterator that you can use to return a pandas DataFrame for each result batch.

    For example:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

Serializing result batches

To move the result batches to other workers or nodes, you can serialize and deserialize the result batches. For example:

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

Working with result batches

The next sections explain how to work with ResultBatch objects:

Iterating over rows in a result batch

With a ResultBatch object, you can iterate over the rows that are part of that batch. For example:

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

When you create an iterator of a ResultBatch object, the object fetches and converts the subset of rows for that batch.

Materializing the rows in a result batch

To materialize the subset of rows in a result batch by passing that ResultBatch object to the list() function. For example:

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

Getting the number of rows and size of a result batch

If you need to determine the number of rows in a result batch and the size of the data, you can use rowcount, compressed_size, and uncompressed_size attributes of the ResultBatch object. For example:

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

Note that these attributes are available before you iterate over the result batch. You don’t need to fetch the subset of rows for the batch in order to get the values of these attributes.

Converting an arrow result batch to a PyArrow table or pandas DataFrame

To convert an ArrowResultBatch to a PyArrow table or a pandas DataFrame, use the following methods:

For example:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy