ベクトル化されたPython UDTFs¶
このトピックでは、ベクトル化されたPython UDTFs を紹介します。
このトピックの内容:
概要¶
Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.
以下の場合に使用します。
行単位ではなく、パーティション単位でデータを処理する必要がある。
パーティションごとに複数の行または列を返す必要がある。
データ分析にpandas DataFrames で作動するライブラリを使用する。
前提条件¶
Python用Snowparkライブラリのバージョン1.6.1以降が必要です。
はじめるにあたり¶
ベクトル化されたend_partitionを使用した UDTF を作成するには、
オプションで、各パーティションを処理する前に呼び出される
__init__
メソッドを持つハンドラークラスを定義します。process
メソッドを定義しないでください。引数 DataFrame を受け取り、
pandas.DataFrame
またはpandas.Series
またはpandas.arrays
のタプル(各配列は列)を返すend_partition
メソッドを定義します。結果の列型は、 UDTF 定義の列型と一致している必要があります。@vectorized
デコレーターまたは_sf_vectorized_input
関数属性を使用して、end_partition
メソッドをベクトル化されたものとしてマークします。詳細については、 ベクトル化されたPython UDFs をご参照ください。@vectorized
デコレーターは、 SQL ワークシートを使用する場合など、Python UDTF がSnowflake内で実行される場合にのみ使用できます。クライアントまたはPythonワークシートを使用して実行する場合は、関数属性を使用する必要があります。
注釈
ベクトル化されたend_partitionを使用した UDTF への入力 DataFrame のデフォルト列名は、 SQL 関数の署名と一致します。列名は SQL 識別子の要件 に従います。つまり、識別子が引用符で囲まれていない場合は大文字になり、二重引用符で囲まれている場合はそのまま保持されます。
以下は、 @vectorized
デコレーターを使用して、ベクトル化されたend_partitionを使用した UDTF を作成する例です。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
以下は、関数属性を使用して、ベクトル化されたend_partitionを使用した UDTF を作成する例です。
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
注釈
ベクトル化されたend_partitionを使用した UDTF は、パーティションを構築するために PARTITION BY 句で呼び出す必要があります。
UDTF を同じパーティションにあるすべてのデータと合わせて呼び出すには、
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
UDTF をx列で分割されたデータと合わせて呼び出すには、
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
型サポート¶
ベクトル化されたend_partitionを使用した UDTFs は、引数と戻り値のために、ベクトル化されたPython UDFs と同じ SQL 型 をサポートします。ただし、ベクトル化されたend_partitionを使用した UDTFs の場合、64ビット以下の小さい整数型にすべてが適合するスケール0を持つ SQL NUMBER
引数は、常に Int16
、 Int32
、または Int64
にマッピングされます。言い換えると、スカラー UDFs とは異なり、UDTF の引数がNULL許容でない場合、 int16
、 int32
、 int64
には変換されないということです。
SQL 型がPandas dtypesにどのようにマッピングされるかを示すテーブルを表示するには、ベクトル化されたPython UDFs のトピックにある 型サポートテーブル をご参照ください。
例: 通常の UDTF を使用した行コレクションと、ベクトル化されたend_partitionのある UDTF を使用した行コレクションの比較。¶
以下は、通常の UDTF を使用した行コレクションの例です。
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
以下は、ベクトル化されたend_partitionのある UDTF を使用して行コレクションを実施する方法の例です。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
例: パーティション内の各列の要約統計量を計算する¶
以下は、pandas describe()
メソッドを使用して、パーティション内の各列の要約統計量を計算する例です。
まず、テーブルを作成し、5行ずつのパーティションを3つ作成します。
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
データをご覧ください。
select * from test_values;
-----------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |
-----------------------------------------------------
|x |8.0 |99.4 |714.6 |168.7 |397.2 |
|x |106.4 |237.1 |971.7 |828.4 |988.2 |
|x |741.3 |207.9 |32.6 |640.6 |63.2 |
|x |541.3 |828.6 |844.9 |77.3 |403.1 |
|x |4.3 |723.3 |924.3 |282.5 |158.1 |
|y |976.1 |562.4 |968.7 |934.3 |977.3 |
|y |390.0 |244.3 |952.6 |101.7 |24.9 |
|y |599.7 |191.8 |90.2 |788.2 |761.2 |
|y |589.5 |201.0 |863.4 |415.1 |696.1 |
|y |46.7 |659.7 |571.1 |938.0 |513.7 |
|z |313.9 |188.5 |964.6 |435.4 |519.6 |
|z |328.3 |643.1 |766.4 |148.1 |596.4 |
|z |929.0 |255.4 |915.9 |857.2 |425.5 |
|z |612.8 |816.4 |220.2 |879.5 |331.4 |
|z |487.1 |704.5 |471.5 |378.9 |481.2 |
-----------------------------------------------------
次に、関数を作成します。
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas
class handler:
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# using describe function to get the summary statistics
result = df.describe().transpose()
# add a column at the beginning for column ids
result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
return result
$$;
関数を呼び出し、 id
で分割します。
-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 |
|x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 |
|x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 |
|x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 |
|x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 |
|y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 |
|y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 |
|y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 |
|y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 |
|y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 |
|z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 |
|z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 |
|z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 |
|z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 |
|z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
あるいは、関数を呼び出し、テーブル全体を1つのパーティションとして扱います。
-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 |
|NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 |
|NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 |
|NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 |
|NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ベストプラクティス¶
このセクションでは、ベストプラクティスについて説明します。
パフォーマンスを向上させ、タイムアウトを防ぐために、
pandas.concat
を使用して部分的な結果を蓄積することは避けてください。代わりに、準備ができ次第、部分的な結果を出すようにします。たとえば、次の代わりに:results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
次を実行します:
while(...): partial_result = pd.DataFrame(...) yield partial_result
各行でスカラーを返す必要がある場合は、
numpy
配列を展開してタプルを作成する代わりに、繰り返し値のリストを作成します。たとえば、2列の結果の場合、次の代わりに:return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
次を実行します:
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
パフォーマンスを向上させるには、半構造化データを列に展開します。たとえば、
obj
というバリアント列があり、x(int)
、y(float)
、z(string)
という要素がある場合、 UDTF を次のような署名で定義する代わりに:create function vec_udtf(variant obj)
また、
vec_udtf(obj)
を使って呼び出す場合は、次の署名で UDTF を定義する必要があります。create function vec_udtf(int, float, string)
さらに、それを
vec_udtf(obj:x, obj:y, obj:z)
で呼び出します。デフォルトでは、Snowflakeは入力を NULL 値をサポートするpandas dtypesにエンコードします(例: Int64)。プリミティブ型(
numpy
など)を必要とするライブラリを使用していて、入力に NULL 値がない場合は、ライブラリを使用する前に列をプリミティブ型にキャストする必要があります。例:input_df['y'] = input_df['y'].astype("int64")
詳細については、 型のサポート をご参照ください。