Python용 Snowflake 커넥터로 결과를 가져오는 워크로드 분산하기

분산 환경을 사용하여 워크로드를 병렬화하는 경우 Python용 Snowflake 커넥터를 사용하여 결과 가져오기 및 처리 작업을 분산할 수 있습니다.

이 항목의 내용:

소개

Cursor 오브젝트를 사용하여 쿼리를 실행한 후, 결과 배치를 사용하여 결과를 가져오는 작업을 분산할 수 있습니다. 결과 배치 는 결과의 하위 세트를 검색하는 함수를 캡슐화합니다. 다양한 작업자를 할당하여 다양한 결과 배치를 사용해 결과를 병렬로 가져오고 처리할 수 있습니다.

결과 배치 목록 검색하기

쿼리를 실행한 후 다음 형식 중 하나로 결과를 검색할 수 있습니다.

  • ResultBatch 오브젝트.

    이렇게 하려면 Cursor 오브젝트에서 get_result_batches() 메서드를 호출하십시오. 이 메서드는 처리를 위해 다른 작업자에 할당할 수 있는 ResultBatch 오브젝트의 목록을 반환합니다. 예를 들면 다음과 같습니다.

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Get the list of result batches
            result_batch_list = cur.get_result_batches()
    
            # Get the number of result batches in the list.
            num_result_batches = len(result_batch_list)
    
            # Split the list of result batches into two
            # to distribute the work of fetching results
            # between two workers.
            result_batch_list_1 = result_batch_list[:: 2]
            result_batch_list_2 = result_batch_list[1 :: 2]
    
    Copy
  • PyArrow 테이블

    다음 메서드를 사용하여 결과 배치를 PyArrow 테이블로 검색할 수 있습니다.

    • fetch_arrow_all(): 모든 결과가 포함된 PyArrow 테이블을 반환하려면 이 메서드를 호출하십시오.

    • fetch_arrow_batches(): 각 결과 배치의 PyArrow 테이블을 반환하는 데 사용할 수 있는 반복기를 반환하려면 이 메서드를 호출하십시오.

    예:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a PyArrow table containing all of the results.
            table = cur.fetch_arrow_all()
    
            # Iterate over a list of PyArrow tables for result batches.
            for table_for_batch in cur.fetch_arrow_batches():
              my_pyarrow_table_processing_function(table_for_batch)
    
    Copy
  • pandas DataFrame 오브젝트.

    Python용 Snowflake 커넥터의 pandas 호환 버전을 설치한 경우, 다음 메서드를 사용하여 결과 배치를 pandas DataFrame 오브젝트로 검색할 수 있습니다.

    • fetch_pandas_all(): 모든 결과가 포함된 pandas DataFrame을 반환하려면 이 메서드를 호출하십시오.

    • fetch_pandas_batches(): 각 결과 배치의 pandas DataFrame을 반환하는 데 사용할 수 있는 반복기를 반환하려면 이 메서드를 호출하십시오.

    예:

    with connect(...) as conn:
        with conn.cursor() as cur:
            # Execute a query.
            cur.execute('select seq4() as n from table(generator(rowcount => 100000));')
    
            # Return a pandas DataFrame containing all of the results.
            table = cur.fetch_pandas_all()
    
            # Iterate over a list of pandas DataFrames for result batches.
            for dataframe_for_batch in cur.fetch_pandas_batches():
              my_dataframe_processing_function(dataframe_for_batch)
    
    Copy

결과 배치 직렬화하기

결과 배치를 다른 작업자나 노드로 이동하기 위해 결과 배치를 직렬화 및 역직렬화할 수 있습니다. 예를 들면 다음과 같습니다.

import pickle

# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])

# At this point, you can move the serialized data to
# another worker/node.
...

# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
Copy

결과 배치로 작업하기

다음 섹션에서는 ResultBatch 오브젝트로 작업하는 방법을 설명합니다.

결과 배치의 행에 대해 반복하기

ResultBatch 오브젝트를 사용하여 해당 배치에 속한 행에 대해 반복할 수 있습니다. 예를 들면 다음과 같습니다.

# Iterate over the list of result batches.
for batch in result_batch_list_1:
    # Iterate over the subset of rows in a result batch.
    for row in batch:
        print(row)
Copy

ResultBatch 오브젝트의 반복기를 만들 때 이 오브젝트는 해당 배치에 대한 행의 하위 세트를 가져와 변환합니다.

결과 배치의 행 구체화하기

해당 ResultBatch 오브젝트를 list() 함수에 전달하여 결과 배치의 행 하위 세트를 구체화합니다. 예를 들면 다음과 같습니다.

# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
Copy

결과 배치의 행 수와 크기 가져오기

결과 배치의 행 수와 데이터 크기를 결정해야 하는 경우 ResultBatch 오브젝트의 행 수, 압축_크기, 비압축_크기 속성을 사용할 수 있습니다. 예를 들면 다음과 같습니다.

# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount

# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
Copy

결과 배치를 반복하기 전에 이러한 속성을 사용할 수 있습니다. 이러한 속성의 값을 가져오려고 배치에 대한 행의 하위 세트를 가져올 필요는 없습니다.

Arrow 결과 배치를 PyArrow 테이블 또는 pandas DataFrame로 변환하기

ArrowResultBatch 를 PyArrow 테이블 또는 pandas DataFrame으로 변환하려면 다음 메서드를 사용하십시오.

예:

with conn_cnx as con:
  with con.cursor() as cur:
    cur.execute("select col1 from table")
    batches = cur.get_result_batches()

    # Get the row from the ResultBatch as a pandas DataFrame.
    dataframe = batches[0].to_pandas()

    # Get the row from the ResultBatch as a PyArrow table.
    table = batches[0].to_arrow()
Copy