Python용 Snowflake 커넥터로 결과를 가져오는 워크로드 분산하기¶
분산 환경을 사용하여 워크로드를 병렬화하는 경우 Python용 Snowflake 커넥터를 사용하여 결과 가져오기 및 처리 작업을 분산할 수 있습니다.
이 항목의 내용:
소개¶
Cursor
오브젝트를 사용하여 쿼리를 실행한 후, 결과 배치를 사용하여 결과를 가져오는 작업을 분산할 수 있습니다. 결과 배치 는 결과의 하위 세트를 검색하는 함수를 캡슐화합니다. 다양한 작업자를 할당하여 다양한 결과 배치를 사용해 결과를 병렬로 가져오고 처리할 수 있습니다.
결과 배치 목록 검색하기¶
쿼리를 실행한 후 다음 형식 중 하나로 결과를 검색할 수 있습니다.
ResultBatch 오브젝트.
이렇게 하려면 Cursor 오브젝트에서
get_result_batches()
메서드를 호출하십시오. 이 메서드는 처리를 위해 다른 작업자에 할당할 수 있는ResultBatch
오브젝트의 목록을 반환합니다. 예를 들면 다음과 같습니다.with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Get the list of result batches result_batch_list = cur.get_result_batches() # Get the number of result batches in the list. num_result_batches = len(result_batch_list) # Split the list of result batches into two # to distribute the work of fetching results # between two workers. result_batch_list_1 = result_batch_list[:: 2] result_batch_list_2 = result_batch_list[1 :: 2]
-
다음 메서드를 사용하여 결과 배치를 PyArrow 테이블로 검색할 수 있습니다.
fetch_arrow_all()
: 모든 결과가 포함된 PyArrow 테이블을 반환하려면 이 메서드를 호출하십시오.fetch_arrow_batches()
: 각 결과 배치의 PyArrow 테이블을 반환하는 데 사용할 수 있는 반복기를 반환하려면 이 메서드를 호출하십시오.
예:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a PyArrow table containing all of the results. table = cur.fetch_arrow_all() # Iterate over a list of PyArrow tables for result batches. for table_for_batch in cur.fetch_arrow_batches(): my_pyarrow_table_processing_function(table_for_batch)
pandas DataFrame 오브젝트.
Python용 Snowflake 커넥터의 pandas 호환 버전을 설치한 경우, 다음 메서드를 사용하여 결과 배치를 pandas DataFrame 오브젝트로 검색할 수 있습니다.
fetch_pandas_all()
: 모든 결과가 포함된 pandas DataFrame을 반환하려면 이 메서드를 호출하십시오.fetch_pandas_batches()
: 각 결과 배치의 pandas DataFrame을 반환하는 데 사용할 수 있는 반복기를 반환하려면 이 메서드를 호출하십시오.
예:
with connect(...) as conn: with conn.cursor() as cur: # Execute a query. cur.execute('select seq4() as n from table(generator(rowcount => 100000));') # Return a pandas DataFrame containing all of the results. table = cur.fetch_pandas_all() # Iterate over a list of pandas DataFrames for result batches. for dataframe_for_batch in cur.fetch_pandas_batches(): my_dataframe_processing_function(dataframe_for_batch)
결과 배치 직렬화하기¶
결과 배치를 다른 작업자나 노드로 이동하기 위해 결과 배치를 직렬화 및 역직렬화할 수 있습니다. 예를 들면 다음과 같습니다.
import pickle
# Serialize a result batch from the first list.
pickled_batch = pickle.dumps(result_batch_list_1[1])
# At this point, you can move the serialized data to
# another worker/node.
...
# Deserialize the result batch for processing.
unpickled_batch = pickle.loads(pickled_batch)
결과 배치로 작업하기¶
다음 섹션에서는 ResultBatch 오브젝트로 작업하는 방법을 설명합니다.
결과 배치의 행에 대해 반복하기¶
ResultBatch
오브젝트를 사용하여 해당 배치에 속한 행에 대해 반복할 수 있습니다. 예를 들면 다음과 같습니다.
# Iterate over the list of result batches.
for batch in result_batch_list_1:
# Iterate over the subset of rows in a result batch.
for row in batch:
print(row)
ResultBatch
오브젝트의 반복기를 만들 때 이 오브젝트는 해당 배치에 대한 행의 하위 세트를 가져와 변환합니다.
결과 배치의 행 구체화하기¶
해당 ResultBatch
오브젝트를 list()
함수에 전달하여 결과 배치의 행 하위 세트를 구체화합니다. 예를 들면 다음과 같습니다.
# Materialize the subset of results for the first result batch
# in the list.
first_result_batch = result_batch_list_1[1]
first_result_batch_data = list(first_result_batch)
결과 배치의 행 수와 크기 가져오기¶
결과 배치의 행 수와 데이터 크기를 결정해야 하는 경우 ResultBatch
오브젝트의 행 수, 압축_크기, 비압축_크기 속성을 사용할 수 있습니다. 예를 들면 다음과 같습니다.
# Get the number of rows in a result batch.
num_rows = first_result_batch.rowcount
# Get the size of the data in a result batch.
compressed_size = first_result_batch.compressed_size
uncompressed_size = first_result_batch.uncompressed_size
결과 배치를 반복하기 전에 이러한 속성을 사용할 수 있습니다. 이러한 속성의 값을 가져오려고 배치에 대한 행의 하위 세트를 가져올 필요는 없습니다.
Arrow 결과 배치를 PyArrow 테이블 또는 pandas DataFrame로 변환하기¶
ArrowResultBatch
를 PyArrow 테이블 또는 pandas DataFrame으로 변환하려면 다음 메서드를 사용하십시오.
to_pandas()
: Python용 Snowflake 커넥터의 pandas 호환 버전을 설치한 경우,ArrowResultBatch
에 행이 포함된 pandas DataFrame을 반환하려면 이 메서드를 호출하십시오.to_arrow()
:ResultBatch
에 행이 포함된 PyArrow 테이블을 반환하려면 이 메서드를 호출하십시오.
예:
with conn_cnx as con:
with con.cursor() as cur:
cur.execute("select col1 from table")
batches = cur.get_result_batches()
# Get the row from the ResultBatch as a pandas DataFrame.
dataframe = batches[0].to_pandas()
# Get the row from the ResultBatch as a PyArrow table.
table = batches[0].to_arrow()