Python用Snowflakeコネクタを使用して結果をフェッチするワークロードの分散

分散環境を使用してワークロードを並列化する場合は、Python用Snowflakeコネクタを使用して、結果のフェッチと処理の作業を分散できます。

このトピックの内容:

概要

Cursor オブジェクトを使用してクエリを実行した後、結果バッチを使用して結果をフェッチする作業を分散できます。 結果バッチ は、結果のサブセットを取得する関数をカプセル化します。異なるワーカーを割り当てて異なる結果バッチを使用すると、結果を並行してフェッチおよび処理することができます。

結果バッチのリストの取得

クエリを実行した後、次のいずれかの形式で結果を取得できます。

  • ResultBatch オブジェクト。

    これを実行するには、 カーソル オブジェクトで get_result_batches() メソッドを呼び出します。これにより、処理のためにさまざまなワーカーに割り当てることができる ResultBatch オブジェクトのリストが返されます。例:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow テーブル

    次のメソッドを使用して、結果バッチを PyArrow テーブルとして取得できます。

    • fetch_arrow_all(): このメソッドを呼び出して、すべての結果を含む PyArrow テーブルを返します。

    • fetch_arrow_batches(): このメソッドを呼び出して、各結果バッチの PyArrow テーブルを返すために使用できる反復子を返します。

    例:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • Pandas DataFrame オブジェクト。

    Python用SnowflakeコネクタのPandas互換バージョンをインストール している場合は、次の方法を使用して、結果バッチをPandas DataFrame オブジェクトとして取得できます。

    • fetch_pandas_all(): このメソッドを呼び出して、すべての結果を含むPandas DataFrame を返します。

    • fetch_pandas_batches(): このメソッドを呼び出して、各結果バッチのPandas DataFrame を返すために使用できる反復子を返します。

    例:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

結果バッチのシリアル化

結果バッチを他のワーカーまたはノードに移動するために、結果バッチをシリアル化および逆シリアル化できます。例:

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

結果バッチの操作

次のセクションでは、 ResultBatch オブジェクトの操作方法について説明します。

結果バッチ内の行の反復処理

ResultBatch オブジェクトを使用すると、そのバッチの一部である行を反復処理できます。例:

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

ResultBatch オブジェクトの反復子を作成すると、オブジェクトはそのバッチの行のサブセットをフェッチして変換します。

結果バッチの行の具体化

その ResultBatch オブジェクトを list() 関数に渡すことにより、結果バッチの行のサブセットを具体化します。例:

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

結果バッチの行数およびサイズの取得

結果バッチの行数とデータのサイズを決定する必要がある場合は、 ResultBatch オブジェクトの rowcountcompressed_size、および uncompressed_size 属性を使用できます。例:

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

これらの属性は、結果バッチを反復処理する前に使用できることに注意してください。これらの属性の値を取得するために、バッチの行のサブセットをフェッチする必要はありません。

Arrow結果バッチの PyArrow テーブルまたはPandas DataFrame への変換

ArrowResultBatch を PyArrow テーブルまたはPandas DataFrame に変換するには、次のメソッドを使用します。

例:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy