ベクトル化されたPython UDTFs¶
このトピックでは、ベクトル化されたPython UDTFs を紹介します。
このトピックの内容:
概要¶
ベクトル化されたPython UDTFs (ユーザー定義テーブル関数)は、バッチで行を操作する方法を提供します。
Snowflakeは、2種類のベクトル化 UDTFs をサポートしています。
ベクトル化された
end_partition
メソッドを持つ UDTFsベクトル化された
process
メソッドを持つ UDTFs
UDTF は、ベクトル化された process
メソッドとベクトル化された end_partition
メソッドの両方を持つことはできないため、1種類を選択する必要があります。
ベクトル化されたend_partitionメソッドを持つ UDTFs¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames
and returning results as
pandas DataFrames
or lists of pandas arrays
or pandas Series.
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
以下のタスクには、ベクトル化された end_partition
メソッドを使用します。
行単位ではなく、パーティション単位でデータを処理する。
パーティションごとに複数の行または列を返す。
データ分析にpandas DataFrames で作動するライブラリを使用する。
ベクトル化されたメソッドを持つ UDTFs¶
ベクトル化された process
メソッドを持つ UDTFs は、操作が1対1のマッピングを実行することを前提に、バッチで行を操作する方法を提供します。言い換えると、このメソッドは入力行ごとに出力行を1つ返します。列の数に制限はありません。
以下のタスクには、ベクトル化された process
メソッドを使用します。
バッチで数列の結果に1対1の変換を適用する。
pandas.DataFrame
を必要とするライブラリを使用する。明示的なパーティショニングを行わずに、バッチで行を処理する。
to_pandas() API を活用して、クエリ結果を直接pandas DataFrame に変換します。
前提条件¶
Python用Snowparkライブラリのバージョン1.14.0以降が必要です。
ベクトル化されたend_partitionを使用した UDTF を作成する¶
任意: 各パーティションを処理する前に呼び出される
__init__
メソッドを持つハンドラークラスを定義します。注:
process
メソッドを定義しないでください。引数 DataFrame を受け取り、
pandas.DataFrame
またはpandas.Series
またはpandas.arrays
のタプル(各配列は列)を返すend_partition
メソッドを定義します。結果の列型は、 UDTF 定義の列型と一致している必要があります。
@vectorized
デコレーターまたは_sf_vectorized_input
関数属性を使用して、end_partition
メソッドをベクトル化されたものとしてマークします。詳細については、 Vectorized Python UDFs をご参照ください。
@vectorized
デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。
注釈
ベクトル化された end_partition
を持つ UDTF への入力 DataFrame のデフォルト列名は、SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。
以下は、 @vectorized
デコレーターを使用して、ベクトル化された end_partition
メソッドを持つ UDTF を作成する例です。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
以下のコードブロックは、関数属性を使用して、ベクトル化された end_partition
メソッドを持つ UDTF を作成する例です。
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
注釈
パーティションを構築するには、ベクトル化された end_partition
メソッドを持つ UDTF は、 PARTITION BY 句で呼び出す必要があります。
UDTF を同じパーティションにあるすべてのデータと合わせて呼び出すには、
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
UDTF をx列で分割されたデータと合わせて呼び出すには。
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
例: 通常の UDTF を使用した行コレクションと、ベクトル化されたend_partitionメソッドを持つ UDTF を使用した行コレクションの比較。¶
通常の UDTF を使った行コレクション。
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
ベクトル化された end_partition
メソッドで UDTF を使用した行コレクション。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
例: パーティション内の各列の要約統計量を計算する¶
以下は、pandas describe()
メソッドを使用して、パーティション内の各列の要約統計量を計算する例です。
テーブルを作成し、5行ずつのパーティションを3つ作成します。
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float); -- generate 3 partitions of 5 rows each insert into test_values select 'x', uniform(1.5,1000.5,random(1))::float col1, uniform(1.5,1000.5,random(2))::float col2, uniform(1.5,1000.5,random(3))::float col3, uniform(1.5,1000.5,random(4))::float col4, uniform(1.5,1000.5,random(5))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'y', uniform(1.5,1000.5,random(10))::float col1, uniform(1.5,1000.5,random(20))::float col2, uniform(1.5,1000.5,random(30))::float col3, uniform(1.5,1000.5,random(40))::float col4, uniform(1.5,1000.5,random(50))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'z', uniform(1.5,1000.5,random(100))::float col1, uniform(1.5,1000.5,random(200))::float col2, uniform(1.5,1000.5,random(300))::float col3, uniform(1.5,1000.5,random(400))::float col4, uniform(1.5,1000.5,random(500))::float col5 from table(generator(rowcount => 5));
このデータをご覧ください。
select * from test_values; ----------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" | ----------------------------------------------------- |x |8.0 |99.4 |714.6 |168.7 |397.2 | |x |106.4 |237.1 |971.7 |828.4 |988.2 | |x |741.3 |207.9 |32.6 |640.6 |63.2 | |x |541.3 |828.6 |844.9 |77.3 |403.1 | |x |4.3 |723.3 |924.3 |282.5 |158.1 | |y |976.1 |562.4 |968.7 |934.3 |977.3 | |y |390.0 |244.3 |952.6 |101.7 |24.9 | |y |599.7 |191.8 |90.2 |788.2 |761.2 | |y |589.5 |201.0 |863.4 |415.1 |696.1 | |y |46.7 |659.7 |571.1 |938.0 |513.7 | |z |313.9 |188.5 |964.6 |435.4 |519.6 | |z |328.3 |643.1 |766.4 |148.1 |596.4 | |z |929.0 |255.4 |915.9 |857.2 |425.5 | |z |612.8 |816.4 |220.2 |879.5 |331.4 | |z |487.1 |704.5 |471.5 |378.9 |481.2 | -----------------------------------------------------
関数を作成します。
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float) returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float) language python RUNTIME_VERSION = 3.9 packages=('pandas') handler='handler' as $$ from _snowflake import vectorized import pandas class handler: @vectorized(input=pandas.DataFrame) def end_partition(self, df): # using describe function to get the summary statistics result = df.describe().transpose() # add a column at the beginning for column ids result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5']) return result $$;
次のいずれかを実行します。
関数を呼び出し、
id
で分割します。-- partition by id select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by id)) order by id, column_name; -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 | |x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 | |x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 | |x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 | |x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 | |y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 | |y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 | |y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 | |y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 | |y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 | |z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 | |z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 | |z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 | |z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 | |z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 | --------------------------------------------------------------------------------------------------------------------------------------------------------------------
あるいは、関数を呼び出し、テーブル全体を1つのパーティションとして扱います。
-- treat the whole table as one partition select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by 1)) order by id, column_name; --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 | |NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 | |NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 | |NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 | |NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ベクトル化されたメソッドの UDTF を作成する¶
通常の UDTFs に似たハンドラークラスを定義し、オプションで
__init__
とend_partition
メソッドを指定する。引数 DataFrame を受け取り、
pandas.DataFrame
、pandas.Series
のタプル、または各配列が列のpandas.arrays
のいずれかを返すprocess
メソッドを定義する。結果の列型は、 UDTF 定義の列型と一致している必要があります。返される結果は、1つの DataFrame またはタプルである必要があります。これは、リストを作成したり返したりできるベクトル化された
end_partition
メソッドとは異なります。@vectorized
デコレーターまたは_sf_vectorized_input
関数属性を使用して、process
メソッドをベクトル化されたものとしてマークします。詳細については、 Vectorized Python UDFs をご参照ください。
@vectorized
デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。任意: Pythonハンドラー関数が実行時間制限を超えている場合、 ターゲットバッチサイズを設定 することができます。
注釈
ベクトル化された process
を持つ UDTF への入力 DataFrame のデフォルト列名は、SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。
ベクトル化された process
メソッドを持つ UDTF のハンドラーは、パーティションを認識する方法でバッチを処理するように実装することも、単にバッチごとに処理するように実装することもできます。詳細については、 ステートフルおよびステートレス処理 をご参照ください。
例: 1つのホットエンコーディングを適用するために、ベクトル化されたプロセスメソッドを持つ UDTF を使用します。¶
以下は、ベクトル化された process
メソッドを持つ UDTF を使用して、10個のカテゴリーを持つテーブルに1つのホットエンコーディングを適用する例。
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
サンプルの結果:
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
テーブルを印刷する準備をします。
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
サンプルの結果:
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
あまり便利ではありませんが、ベクトル化された UDF を使っても同じ結果が得られます。結果を1つの列にパッケージ化し、その列をアンパックして使用可能なpandas DataFrame に復元する必要があります。
UDF の使用例。
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
型サポート¶
ベクトル化された UDTFs は、ベクトル化された UDFs と同じ SQL 型 をサポートします。ただし、ベクトル化された UDTFs の場合、すべてが64ビットまたはより小さい整数型に適合するスケール0を持つ SQL NUMBER
引数は、常に Int16
、 Int32
、または Int64
にマッピングされます。スカラー UDFs とは異なり、UDTF の引数がNULL許容でない場合、 int16
、 int32
、 int64
には変換されないということです。
SQL 型がPandas dtypesにどのようにマッピングされるかを示すテーブルを表示するには、ベクトル化されたPython UDFs のトピックにある 型サポートテーブル をご参照ください。
ベストプラクティス¶
各行でスカラーを返す必要がある場合は、
numpy
配列を展開してタプルを作成する代わりに、繰り返し値のリストを作成します。たとえば、次の代わりに、2列の結果の場合。return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
これを使います。
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
パフォーマンスを向上させるには、半構造化データを列に展開します。
たとえば、 UDTF を次のような署名で定義し、
vec_udtf(obj)
を使ってそれを呼び出す代わりに、obj
というバリアント列があり、x(int)
、y(float)
、z(string)
という要素がある場合。create function vec_udtf(variant obj)
このようなシグネチャーで UDTF を定義し、
vec_udtf(obj:x, obj:y, obj:z)
を使ってそれを呼び出します。create function vec_udtf(int, float, string)
デフォルトでは、Snowflakeは入力を NULL 値をサポートするpandas dtypesにエンコードします(例: Int64)。プリミティブ型(
numpy
など)を必要とするライブラリを使用していて、入力に NULL 値がない場合は、ライブラリを使用する前に列をプリミティブ型にキャストする必要があります。例:input_df['y'] = input_df['y'].astype("int64")
詳細については、 型のサポート をご参照ください。
ベクトル化された
end_partition
メソッドを持つ UDTFs を使用する場合、パフォーマンスを向上させ、タイムアウトを防ぐために、pandas.concat
を使用して結果を部分的に蓄積することは避けてください。代わりに、準備ができ次第、部分的な結果を出すようにします。たとえば、次の代わりに:
results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
次を実行します:
while(...): partial_result = pd.DataFrame(...) yield partial_result