벡터화된 Python UDF

이 항목에서는 벡터화된 Python UDF를 소개합니다.

이 항목의 내용:

개요

Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

다음과 같은 경우에 사용하십시오.

  • 행 단위 대신 파티션 단위로 데이터를 처리해야 하는 경우.

  • 각 파티션에 대해 여러 행이나 열을 반환해야 하는 경우.

  • 데이터 분석을 위해 pandas DataFrames에서 작동하는 라이브러리를 사용하려는 경우.

전제 조건

Python용 Snowpark 라이브러리 버전 1.6.1 이상이 필요합니다.

시작하기

벡터화된 end_partition이 있는 UDTF를 생성하는 방법은 다음과 같습니다.

  • 선택적으로, 각 파티션을 처리하기 전에 호출될 __init__ 메서드로 처리기 클래스를 정의합니다.

  • process 메서드를 정의하지 마십시오.

  • DataFrame 인자를 취하여 각 배열이 열인 pandas.DataFrame 이나 pandas.Series 또는 pandas.arrays 의 튜플을 반환하거나 생성하는 end_partition 메서드를 정의합니다. 결과의 열 유형은 UDTF 정의의 열 유형과 일치해야 합니다.

  • @vectorized 데코레이터 또는 _sf_vectorized_input 함수 특성을 사용하여 end_partition 메서드를 벡터화된 것으로 표시합니다. 자세한 내용은 벡터화된 Python UDTF 를 참조하십시오. @vectorized 데코레이터는 Python UDTF가 Snowflake 내에서 실행될 때만(예: SQL 워크시트를 사용할 때) 사용할 수 있습니다. 클라이언트 또는 Python 워크시트를 사용하여 실행하는 경우 function 특성을 사용해야 합니다.

참고

벡터화된 end_partition이 있는 UDTF에 대한 입력 DataFrame의 기본 열 이름은 SQL 함수의 서명과 일치합니다. 열 이름은 SQL 식별자 요구 사항 을 따릅니다. 즉, 식별자는 따옴표로 묶지 않으면 대문자로 표시되고, 큰따옴표로 묶으면 그대로 유지됩니다.

다음은 @vectorized 데코레이터를 사용하여 벡터화된 end_partition이 있는 UDTF를 생성하는 예입니다.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

다음은 함수 특성을 사용하여 벡터화된 end_partition이 있는 UDTF를 생성하는 예입니다.

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

참고

파티션을 빌드하려면 벡터화된 end_partition이 있는 UDTF를 PARTITION BY 절과 함께 호출해야 합니다.

동일한 파티션에 있는 모든 데이터와 함께 UDTF를 호출하려면 다음을 수행하십시오.

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

열 x로 분할된 데이터가 있는 UDTF를 호출하려면 다음을 수행하십시오.

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

타입 지원

벡터화된 end_partition이 있는 UDTF는 인자 및 반환 값에 대해 벡터화된 Python UDF와 동일한 SQL 유형 을 지원합니다. 그러나 벡터화된 end_partition이 있는 UDTF의 경우 64비트 이하 정수형에 모두 맞는 0 스케일의 SQL NUMBER 인자는 항상 Int16, Int32 또는 Int64 에 매핑됩니다. 즉, 스칼라 UDF와는 달리 UDTF의 인자가 null을 허용하지 않으면 int16, int32 또는 int64 로 변환되지 않습니다.

SQL 유형이 Pandas dtype에 매핑되는 방식을 보여주는 표를 보려면 벡터화된 Python UDF 항목의 유형 지원 표 를 참조하십시오.

예: 일반 UDTF를 사용하는 행 수집과 벡터화된 end_partition이 있는 UDTF를 사용하는 행 수집의 비교

다음은 일반 UDTF를 사용하여 행을 수집하는 방법의 예입니다.

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

다음은 벡터화된 end_partition이 있는 UDTF를 사용하여 행을 수집하는 방법의 예입니다.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

예: 파티션의 각 열에 대한 요약 통계 계산하기

다음은 pandas describe() 메서드를 사용하여 파티션의 각 열에 대한 요약 통계를 계산하는 방법의 예입니다.

먼저 테이블을 만들고 각각 5 행으로 구성된 파티션을 3개 생성합니다.

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

데이터를 살펴보십시오.

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

다음으로, 함수를 만듭니다.

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

id 로 함수와 파티션을 호출합니다.

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

또는 함수를 호출하고 전체 테이블을 하나의 파티션으로 다룹니다.

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

모범 사례

이 섹션에서는 모범 사례를 설명합니다.

  1. 성능을 향상하고 시간 초과를 방지하려면 pandas.concat 을 사용하여 부분적인 결과를 누적하지 마십시오. 대신, 준비가 될 때마다 부분적인 결과를 산출하십시오. 예를 들어 다음을 대신 산출하십시오.

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    다음을 실행하십시오.

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
  2. 각각의 행과 함께 스칼라를 반환해야 하는 경우 numpy 배열을 압축 해제하는 대신 반복되는 값 목록을 작성하여 튜플을 만드십시오. 예를 들어 2열로 된 결과의 경우 대신 다음을 실행하십시오.

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    다음을 실행하십시오.

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  3. 성능을 향상하려면 반정형 데이터를 열로 압축 해제하십시오. 예를 들어 x(int), y(float)z(string) 요소가 포함된 베리언트 열 obj 가 있는 경우, 다음과 같은 서명으로 UDTF를 정의하는 대신 다음을 실행하십시오.

    create function vec_udtf(variant obj)
    
    Copy

    vec_udtf(obj) 를 사용하여 호출하면 서명으로 UDTF를 정의해야 합니다.

    create function vec_udtf(int, float, string)
    
    Copy

    그리고 vec_udtf(obj:x, obj:y, obj:z) 를 사용하여 호출하십시오.

  4. 기본적으로, Snowflake는 입력을 NULL 값(예: Int64)을 지원하는 pandas dtype으로 인코딩합니다. 기본 유형(예: numpy)이 필요한 라이브러리를 사용 중이고 입력에 NULL 값이 없는 경우 라이브러리를 사용하기 전에 열을 기본 유형으로 캐스트해야 합니다. 예:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    자세한 내용은 유형 지원 을 참조하십시오.