벡터화된 Python UDF

이 항목에서는 벡터화된 Python UDF를 소개합니다.

이 항목의 내용:

개요

벡터화된 Python UDTFs(사용자 정의 테이블 함수)는 행을 일괄적으로 연산할 수 있는 방법을 제공합니다.

Snowflake는 두 가지 종류의 벡터화된 UDTFs를 지원합니다.

  • 벡터화된 end_partition 메서드가 있는 UDTFs

  • 벡터화된 process 메서드가 있는 UDTFs

UDTF는 벡터화된 process 메서드와 벡터화된 end_partition 메서드를 모두 가질 수 없으므로 한 가지 종류를 선택해야 합니다.

벡터화된 end_partition 메서드가 있는 UDTFs

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

다음 작업에는 벡터화된 end_partition 메서드를 사용합니다.

  • 행 단위 대신 파티션 단위로 데이터를 처리해야 하는 경우.

  • 각 파티션에 대해 여러 행이나 열을 반환해야 하는 경우.

  • 데이터 분석을 위해 pandas DataFrames에서 동작하는 라이브러리를 사용할 경우.

벡터화된 프로세스 메서드가 있는 UDTFs

벡터화된 process 메서드가 있는 UDTFs는 작업이 일대일 매핑을 수행한다고 가정하여 행을 일괄적으로 연산하는 방법을 제공합니다. 즉, 이 메서드는 입력 행마다 하나의 출력 행을 반환합니다. 열의 개수에는 제한이 없습니다.

다음 작업에는 벡터화된 process 메서드를 사용합니다.

  • 일대일 변환을 적용하여 여러 열로 된 결과를 일괄적으로 적용하려는 경우.

  • pandas.DataFrame 이 필요한 라이브러리를 사용합니다.

  • 명시적 분할 없이 행을 일괄 처리하려는 경우.

  • to_pandas() API를 활용하여 쿼리 결과를 pandas DataFrame로 변환하려는 경우.

전제 조건

Python용 Snowpark 라이브러리 버전 1.14.0 이상이 필요합니다.

벡터화된 end_partition 메서드가 있는 UDTF 만들기

  1. 선택 사항: 각 파티션을 처리하기 전에 호출될 __init__ 메서드로 처리기 클래스를 정의합니다.

    참고: process 메서드를 정의하지 마십시오.

  2. DataFrame 인자를 취하여 각 배열이 열인 pandas.DataFrame 이나 pandas.Series 또는 pandas.arrays 의 튜플을 반환하거나 생성하는 end_partition 메서드를 정의합니다.

    결과의 열 유형은 UDTF 정의의 열 유형과 일치해야 합니다.

  3. end_partition 메서드를 벡터화된 메서드로 표시하려면 @vectorized 데코레이터 또는 _sf_vectorized_input 함수 특성을 사용합니다.

    자세한 내용은 벡터화된 Python UDFs 섹션을 참조하십시오. @vectorized 데코레이터는 Python UDTF가 Snowflake 내에서 실행될 때만(예: SQL 워크시트를 사용할 때) 사용할 수 있습니다. 클라이언트 또는 Python 워크시트를 사용하여 실행하는 경우 function 특성을 사용해야 합니다.

참고

벡터화된 end_partition 메서드가 있는 UDTF에 대한 입력 DataFrame의 기본 열 이름은 SQL 함수의 서명과 일치합니다. 열 이름은 SQL 식별자 요구 사항 을 따릅니다. 즉, 식별자가 따옴표로 묶이지 않은 경우 대문자로 표시되고 큰따옴표로 묶인 경우 변경되지 않고 그대로 유지됩니다.

다음은 @vectorized 데코레이터를 사용하여 벡터화된 end_partition 메서드가 있는 UDTF를 생성하는 코드 블록입니다.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

다음 코드 블록은 함수 특성을 사용하여 벡터화된 end_partition 메서드로 UDTF를 생성하는 예제입니다.

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

참고

파티션을 빌드하려면 벡터화된 end_partition 메서드가 있는 UDTF를 PARTITION BY 절과 함께 호출해야 합니다.

동일한 파티션에 있는 모든 데이터와 함께 UDTF를 호출하려면 다음을 수행하십시오.

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

열 x로 데이터가 분할된는 UDTF를 호출하려면 다음을 수행합니다.

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

예: 일반 UDTF를 사용한 행 수집과 벡터화된 end_partition 메서드를 사용한 UDTF 사용 비교

일반 UDTF를 사용한 행 수집:

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

벡터화된 end_partition 메서드와 함께 UDTF를 사용한 행 수집:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

예: 파티션의 각 열에 대한 요약 통계 계산하기

다음은 pandas describe() 메서드를 사용하여 파티션의 각 열에 대한 요약 통계를 계산하는 방법의 예입니다.

  1. 테이블을 생성하고 각각 5개 행으로 구성된 3개의 파티션을 생성합니다.

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. 데이터를 살펴보십시오.

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. 다음과 같이 함수를 만드십시오.

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. 다음 단계 중 하나를 수행합니다.

    • id 로 함수와 파티션을 호출합니다.

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • 함수를 호출하고 전체 테이블을 1개의 파티션으로 다룹니다.

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

벡터화된 프로세스 메서드가 있는 UDTF 만들기

  1. 일반 UDTFs와 유사한 핸들러 클래스를 정의하고 선택적으로 __init__end_partition 메서드를 지정합니다.

  2. DataFrame 인자를 받아 각 배열이 열인 pandas.DataFrame 이나 pandas.Series 또는 pandas.arrays 의 튜플을 반환하는 process 메서드를 정의합니다.

    결과의 열 유형은 UDTF 정의의 열 유형과 일치해야 합니다. 반환되는 결과는 정확히 1개의 DataFrame 또는 튜플이어야 합니다. 이 결과는 목록을 생성하거나 반환할 수 있는 벡터화된 end_partition 메서드와 다릅니다.

  3. process 메서드를 벡터화된 메서드로 표시하려면 @vectorized 데코레이터 또는 _sf_vectorized_input 함수 특성을 사용합니다.

    자세한 내용은 벡터화된 Python UDFs 섹션을 참조하십시오. @vectorized 데코레이터는 Python UDTF가 Snowflake 내에서 실행될 때만(예: SQL 워크시트를 사용할 때) 사용할 수 있습니다. 클라이언트 또는 Python 워크시트를 사용하여 실행하는 경우 function 특성을 사용해야 합니다.

  4. 선택 사항: Python 처리기 함수가 실행 시간 제한을 초과하는 경우, 목표 배치 크기를 설정 합니다.

참고

벡터화된 process 메서드가 있는 UDTF에 대한 입력 DataFrame의 기본 열 이름은 SQL 함수의 서명과 일치합니다. 열 이름은 SQL 식별자 요구 사항 을 따릅니다. 즉, 식별자가 따옴표로 묶이지 않은 경우 대문자로 표시되고 큰따옴표로 묶인 경우 변경되지 않고 그대로 유지됩니다.

벡터화된 process 메서드가 있는 UDTF의 핸들러는 파티션 인식 방식으로 배치를 처리하거나 단순히 배치별로 처리하도록 구현할 수 있습니다. 자세한 내용은 상태 저장 및 상태 비저장 처리 섹션을 참조하십시오.

예: 벡터화된 프로세스 메서드가 있는 UDTF를 사용하여 1개의 핫 인코딩 적용

벡터화된 process 메서드가 있는 UDTF를 사용하여 10개의 카테고리가 있는 테이블에 하나의 핫 인코딩을 적용합니다.

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

샘플 결과:

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

테이블을 출력할 준비를 합니다.

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

샘플 결과:

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

이 메서드는 편의성이 떨어지지만, 벡터화된 UDF를 사용하여 동일한 결과를 얻을 수 있습니다. 결과를 1개의 열로 패키징한 다음 열의 포장을 풀어야 사용 가능한 pandas DataFrame으로 결과를 복원할 수 있습니다.

벡터화된 UDF 사용의 예

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

타입 지원

벡터화된 UDTFs는 벡터화된 UDFs와 동일한 SQL 유형 을 지원합니다. 그러나 벡터화된 UDTFs의 경우 64비트 이하 정수형에 모두 맞는 0 소수 자릿수의 SQL NUMBER 인자는 항상 Int16, Int32 또는 Int64 에 매핑됩니다. 스칼라 UDFs와는 달리 UDTF의 인자가 null을 허용하지 않으면 int16, int32 또는 int64 로 변환되지 않습니다.

SQL 유형이 Pandas dtype에 매핑되는 방식을 보여주는 테이블을 보려면 벡터화된 Python UDFs 항목의 유형 지원 테이블 을 참조하십시오.

모범 사례

  • 각각의 행과 함께 스칼라를 반환해야 하는 경우 numpy 배열을 압축 해제하는 대신 반복되는 값 목록을 작성하여 튜플을 만드십시오. 예를 들어 2열로 구성된 결과의 경우 대신 다음을 실행합니다.

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    다음을 사용하십시오.

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • 성능을 향상하려면 반정형 데이터를 열로 압축 해제하십시오.

    예를 들어, x(int), y(float)z(string) 요소가 포함된 베리언트 열 obj 가 있는 경우, 다음과 같은 서명으로 UDTF를 정의하는 대신 vec_udtf(obj): 를 사용하여 호출합니다.

    create function vec_udtf(variant obj)
    
    Copy

    이와 같이 서명으로 UDTF를 정의하고 vec_udtf(obj:x, obj:y, obj:z) 를 사용하여 호출합니다.

    create function vec_udtf(int, float, string)
    
    Copy
  • 기본적으로, Snowflake는 입력을 NULL 값(예: Int64)을 지원하는 pandas dtype으로 인코딩합니다. 기본 유형(예: numpy)이 필요한 라이브러리를 사용 중이고 입력에 NULL 값이 없는 경우 라이브러리를 사용하기 전에 열을 기본 유형으로 캐스트해야 합니다. 예:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    자세한 내용은 유형 지원 을 참조하십시오.

  • 벡터화된 end_partition 메서드가 있는 UDTFs를 사용하는 경우 성능을 향상하고 시간 초과를 방지하려면 pandas.concat 을 사용하여 부분적인 결과를 누적하지 마십시오. 대신, 준비가 될 때마다 부분적인 결과를 산출하십시오.

    예를 들어 다음을 대신 산출하십시오.

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    다음을 실행하십시오.

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy