ベクトル化されたPython UDTFs

このトピックでは、ベクトル化されたPython UDTFs を紹介します。

このトピックの内容:

概要

ベクトル化されたPython UDTFs (ユーザー定義テーブル関数)は、バッチで行を操作する方法を提供します。

Snowflakeは、2種類のベクトル化 UDTFs をサポートしています。

  • ベクトル化された end_partition メソッドを持つ UDTFs

  • ベクトル化された process メソッドを持つ UDTFs

UDTF は、ベクトル化された process メソッドとベクトル化された end_partition メソッドの両方を持つことはできないため、1種類を選択する必要があります。

ベクトル化されたend_partitionメソッドを持つ UDTFs

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

以下のタスクには、ベクトル化された end_partition メソッドを使用します。

  • 行単位ではなく、パーティション単位でデータを処理する。

  • パーティションごとに複数の行または列を返す。

  • データ分析にpandas DataFrames で作動するライブラリを使用する。

ベクトル化されたメソッドを持つ UDTFs

ベクトル化された process メソッドを持つ UDTFs は、操作が1対1のマッピングを実行することを前提に、バッチで行を操作する方法を提供します。言い換えると、このメソッドは入力行ごとに出力行を1つ返します。列の数に制限はありません。

以下のタスクには、ベクトル化された process メソッドを使用します。

  • バッチで数列の結果に1対1の変換を適用する。

  • pandas.DataFrame を必要とするライブラリを使用する。

  • 明示的なパーティショニングを行わずに、バッチで行を処理する。

  • to_pandas() API を活用して、クエリ結果を直接pandas DataFrame に変換します。

前提条件

Python用Snowparkライブラリのバージョン1.14.0以降が必要です。

ベクトル化されたend_partitionを使用した UDTF を作成する

  1. 任意: 各パーティションを処理する前に呼び出される __init__ メソッドを持つハンドラークラスを定義します。

    注: process メソッドを定義しないでください。

  2. 引数 DataFrame を受け取り、 pandas.DataFrame または pandas.Series または pandas.arrays のタプル(各配列は列)を返す end_partition メソッドを定義します。

    結果の列型は、 UDTF 定義の列型と一致している必要があります。

  3. @vectorized デコレーターまたは _sf_vectorized_input 関数属性を使用して、 end_partition メソッドをベクトル化されたものとしてマークします。

    詳細については、 Vectorized Python UDFs をご参照ください。 @vectorized デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。

注釈

ベクトル化された end_partition を持つ UDTF への入力 DataFrame のデフォルト列名は、SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。

以下は、 @vectorized デコレーターを使用して、ベクトル化された end_partition メソッドを持つ UDTF を作成する例です。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

以下のコードブロックは、関数属性を使用して、ベクトル化された end_partition メソッドを持つ UDTF を作成する例です。

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

注釈

パーティションを構築するには、ベクトル化された end_partition メソッドを持つ UDTF は、 PARTITION BY 句で呼び出す必要があります。

UDTF を同じパーティションにあるすべてのデータと合わせて呼び出すには、

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

UDTF をx列で分割されたデータと合わせて呼び出すには。

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

例: 通常の UDTF を使用した行コレクションと、ベクトル化されたend_partitionメソッドを持つ UDTF を使用した行コレクションの比較。

通常の UDTF を使った行コレクション。

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

ベクトル化された end_partition メソッドで UDTF を使用した行コレクション。

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

例: パーティション内の各列の要約統計量を計算する

以下は、pandas describe() メソッドを使用して、パーティション内の各列の要約統計量を計算する例です。

  1. テーブルを作成し、5行ずつのパーティションを3つ作成します。

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. このデータをご覧ください。

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. 関数を作成します。

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. 次のいずれかを実行します。

    • 関数を呼び出し、 id で分割します。

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • あるいは、関数を呼び出し、テーブル全体を1つのパーティションとして扱います。

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

ベクトル化されたメソッドの UDTF を作成する

  1. 通常の UDTFs に似たハンドラークラスを定義し、オプションで __init__end_partition メソッドを指定する。

  2. 引数 DataFrame を受け取り、 pandas.DataFramepandas.Series のタプル、または各配列が列の pandas.arrays のいずれかを返す process メソッドを定義する。

    結果の列型は、 UDTF 定義の列型と一致している必要があります。返される結果は、1つの DataFrame またはタプルである必要があります。これは、リストを作成したり返したりできるベクトル化された end_partition メソッドとは異なります。

  3. @vectorized デコレーターまたは _sf_vectorized_input 関数属性を使用して、 process メソッドをベクトル化されたものとしてマークします。

    詳細については、 Vectorized Python UDFs をご参照ください。 @vectorized デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。

  4. 任意: Pythonハンドラー関数が実行時間制限を超えている場合、 ターゲットバッチサイズを設定 することができます。

注釈

ベクトル化された process を持つ UDTF への入力 DataFrame のデフォルト列名は、SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。

ベクトル化された process メソッドを持つ UDTF のハンドラーは、パーティションを認識する方法でバッチを処理するように実装することも、単にバッチごとに処理するように実装することもできます。詳細については、 ステートフルおよびステートレス処理 をご参照ください。

例: 1つのホットエンコーディングを適用するために、ベクトル化されたプロセスメソッドを持つ UDTF を使用します。

以下は、ベクトル化された process メソッドを持つ UDTF を使用して、10個のカテゴリーを持つテーブルに1つのホットエンコーディングを適用する例。

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

サンプルの結果:

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

テーブルを印刷する準備をします。

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

サンプルの結果:

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

あまり便利ではありませんが、ベクトル化された UDF を使っても同じ結果が得られます。結果を1つの列にパッケージ化し、その列をアンパックして使用可能なpandas DataFrame に復元する必要があります。

UDF の使用例。

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

型サポート

ベクトル化された UDTFs は、ベクトル化された UDFs と同じ SQL 型 をサポートします。ただし、ベクトル化された UDTFs の場合、すべてが64ビットまたはより小さい整数型に適合するスケール0を持つ SQL NUMBER 引数は、常に Int16Int32、または Int64 にマッピングされます。スカラー UDFs とは異なり、UDTF の引数がNULL許容でない場合、 int16int32int64 には変換されないということです。

SQL 型がPandas dtypesにどのようにマッピングされるかを示すテーブルを表示するには、ベクトル化されたPython UDFs のトピックにある 型サポートテーブル をご参照ください。

ベストプラクティス

  • 各行でスカラーを返す必要がある場合は、 numpy 配列を展開してタプルを作成する代わりに、繰り返し値のリストを作成します。たとえば、次の代わりに、2列の結果の場合。

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    これを使います。

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • パフォーマンスを向上させるには、半構造化データを列に展開します。

    たとえば、 UDTF を次のような署名で定義し、 vec_udtf(obj) を使ってそれを呼び出す代わりに、 obj というバリアント列があり、 x(int)y(float)z(string) という要素がある場合。

    create function vec_udtf(variant obj)
    
    Copy

    このようなシグネチャーで UDTF を定義し、 vec_udtf(obj:x, obj:y, obj:z) を使ってそれを呼び出します。

    create function vec_udtf(int, float, string)
    
    Copy
  • デフォルトでは、Snowflakeは入力を NULL 値をサポートするpandas dtypesにエンコードします(例: Int64)。プリミティブ型(numpy など)を必要とするライブラリを使用していて、入力に NULL 値がない場合は、ライブラリを使用する前に列をプリミティブ型にキャストする必要があります。例:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    詳細については、 型のサポート をご参照ください。

  • ベクトル化された end_partition メソッドを持つ UDTFs を使用する場合、パフォーマンスを向上させ、タイムアウトを防ぐために、 pandas.concat を使用して結果を部分的に蓄積することは避けてください。代わりに、準備ができ次第、部分的な結果を出すようにします。

    たとえば、次の代わりに:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    次を実行します:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy