ベクトル化されたPython UDFs¶
このトピックでは、ベクトル化されたPython UDFs を紹介します。
このトピックの内容:
概要¶
ベクトル化されたPython UDFs を使用すると、入力行のバッチを Pandas DataFrames として受け取り、結果のバッチを Pandas配列 または Series として返すPython関数を定義します。他のPython UDFs を呼び出すのと同じ方法で、ベクトル化されたPython UDFs を呼び出します。
デフォルトの行ごとの処理パターンと比較して、ベクトル化された UDFs を使用する利点は次のとおりです。
Pythonコードが行のバッチで効率的に動作する場合、パフォーマンスが向上する可能性があります。
Pandas DataFramesまたはPandas配列で動作するライブラリを呼び出す場合は、必要な変換ロジックが少なくなります。
ベクトル化されたPython UDFs を使用する場合、
Python UDFsを使用してクエリを作成する方法を変更する必要はありません。すべてのバッチ処理は、独自のコードではなく、UDFフレームワークによって処理されます。
ベクトル化されていない UDFs と同様に、ハンドラーコードのどのインスタンスが入力のどのバッチを参照するかは保証されません。
ベクトル化されたPython UDFs 入門¶
ベクトル化されたPython UDF を作成するには、ハンドラー関数への注釈付与でサポートされているメカニズムの1つを使用します。
vectorized デコレーターの使用¶
_snowflake モジュールは、Snowflake内で実行されるPython UDFsに公開されています。Pythonコードで、 _snowflake モジュールをインポートし、 vectorized デコレーターを使用して、 input パラメーターを pandas.DataFrame に設定することにより、ハンドラーがPandas DataFrameを受け取ることを期待するように指定します。
CREATE FUNCTION add_one_to_inputs(x NUMBER(10, 0), y NUMBER(10, 0))
  RETURNS NUMBER(10, 0)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('pandas')
  HANDLER = 'add_one_to_inputs'
AS $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame)
def add_one_to_inputs(df):
 return df[0] + df[1] + 1
$$;
関数属性の使用¶
_snowflakeモジュールをインポートして vectorized デコレーターを使用する代わりに、ハンドラー関数に特別な _sf_vectorized_input 属性を設定することができます。
CREATE FUNCTION add_one_to_inputs(x NUMBER(10, 0), y NUMBER(10, 0))
  RETURNS NUMBER(10, 0)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('pandas')
  HANDLER = 'add_one_to_inputs'
AS $$
import pandas
def add_one_to_inputs(df):
 return df[0] + df[1] + 1
add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
$$;
ターゲットバッチサイズの設定¶
Pythonハンドラー関数の呼び出しは、制限時間である180秒以内に実行する必要があり、ハンドラー関数への入力として渡される各 DataFrame には、現在最大数千行が含まれる場合があります。制限時間内にとどまるために、ハンドラー関数のターゲットバッチサイズを設定することをお勧めします。これにより、入力DataFrameごとの最大行数が制限されます。より大きな値を設定しても、Snowflakeが指定された行数のバッチをエンコードすることを保証するものではないことに注意してください。 vectorized デコレーターまたは関数の属性を使用して、ターゲットバッチサイズを設定できます。
注釈
max_batch_size の使用は、 UDF が1つのバッチで処理できる行数を制限するためのメカニズムとしてのみ使用されます。たとえば、 UDF が一度に最大100行しか処理できないように記述されている場合は、 max_batch_size を100に設定する必要があります。 max_batch_size の設定は、任意の大きなバッチサイズを指定するメカニズムとして使用するためのものではありません。UDF が任意のサイズのバッチを処理する場合は、このパラメーターを未設定のままにしておくことをお勧めします。
vectorized デコレーターの使用¶
vectorized デコレーターを使用してターゲットバッチサイズを設定するには、 max_batch_size という名前の引数に正の整数値を渡します。
例として、このステートメントは、各Dataframeを最大100行に制限するベクトル化されたPython UDF を作成します。
CREATE FUNCTION add_one_to_inputs(x NUMBER(10, 0), y NUMBER(10, 0))
  RETURNS NUMBER(10, 0)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('pandas')
  HANDLER = 'add_one_to_inputs'
AS $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame, max_batch_size=100)
def add_one_to_inputs(df):
 return df[0] + df[1] + 1
$$;
関数属性の使用¶
関数属性を使用してターゲットバッチサイズを設定するには、ハンドラー関数の _sf_max_batch_size 属性に正の整数値を設定します。
例として、このステートメントは、各 DataFrame を最大100行に制限するベクトル化されたPython UDF を作成します。
CREATE FUNCTION add_one_to_inputs(x NUMBER(10, 0), y NUMBER(10, 0))
  RETURNS NUMBER(10, 0)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('pandas')
  HANDLER = 'add_one_to_inputs'
AS $$
import pandas
def add_one_to_inputs(df):
 return df[0] + df[1] + 1
add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
add_one_to_inputs._sf_max_batch_size = 100
$$;
DataFrame エンコーディング¶
UDFへの引数のバッチは、入力パンダPandas DataFramesの配列としてエンコードされ、各DataFrameの行数は異なる場合があります。詳細については、 ターゲットバッチサイズの設定 をご参照ください。引数は、インデックスによってDataFrameでアクセスできます。つまり、最初の引数のインデックスは0、2番目の引数のインデックスは1というようになります。UDFハンドラーが返すPandas配列またはシリーズは、入力DataFrameと同じ長さである必要があります。
説明のために、ベクトル化されたPython UDF を次のように定義するとします。
CREATE OR REPLACE FUNCTION add_inputs(x INT, y FLOAT)
  RETURNS FLOAT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('pandas')
  HANDLER = 'add_inputs'
AS $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame)
def add_inputs(df):
  return df[0] + df[1]
$$;
このUDFは、 df[0] を使用して最初の引数のPandas配列にアクセスし、 df[1] を使用して2番目の引数にアクセスします。 df[0] + df[1] は、2つの配列からの対応する要素のペアワイズ合計を持つPandas配列になります。UDFを作成した後、いくつかの入力行を使用して呼び出すことができます。
SELECT add_inputs(x, y)
FROM (
  SELECT 1 AS x, 3.14::FLOAT as y UNION ALL
  SELECT 2, 1.59 UNION ALL
  SELECT 3, -0.5
);
+------------------+
| ADD_INPUTS(X, Y) |
|------------------|
|             4.14 |
|             3.59 |
|             2.5  |
+------------------+
ここで、 add_inputs Python関数は、次のPythonコードで作成されたものに類似したDataFrameを受け取ります。
>>> import pandas
>>> df = pandas.DataFrame({0: pandas.array([1, 2, 3]), 1: pandas.array([3.14, 1.59, -0.5])})
>>> df
   0     1
0  1  3.14
1  2  1.59
2  3 -0.50
ハンドラー関数の行 return df[0] + df[1] は、次のPythonコードのような配列になります。
>>> df[0] + df[1]
0    4.14
1    3.59
2    2.50
dtype: float64
型サポート¶
ベクトル化された UDFs は、引数と戻り値に対して次の SQL 型 をサポートします。この表は、各SQL引数が特定の dtype のPandas配列としてどのようにエンコードされるかを反映しています。
SQL 型  | 
パンダdtype  | 
メモ  | 
|---|---|---|
NUMBER  | 
すべてが64ビットまたはより小さい整数型に適合するスケール0を持つ   | 
UDFへの入力引数がnull許容ではないと解釈されるようにするには、   | 
FLOAT  | 
  | 
NULL値はNaN値としてエンコードされます。出力では、NaN値はNULLsとして解釈されます。  | 
BOOLEAN  | 
null許容引数の場合は   | 
|
VARCHAR  | 
  | 
Snowflake SQLとPandasはどちらも、UTF-8エンコーディングを使用して文字列を表します。  | 
BINARY  | 
  | 
|
DATE  | 
  | 
各値は、時間コンポーネントなしで   | 
VARIANT  | 
  | 
各バリアント行は、引数の場合は動的にPython型に変換され、戻り値の場合はその逆に変換されます。次のタイプは、ネイティブPython型ではなく文字列に変換されます:   | 
OBJECT  | 
  | 
|
ARRAY  | 
  | 
|
TIME  | 
  | 
各値は、真夜中からのオフセットとしてエンコードされます。NULL値は   | 
TIMESTAMP_LTZ  | 
  | 
ローカルタイムゾーンを使用して、各値をUTC Unixエポックを基準にしたナノ秒スケールの   | 
TIMESTAMP_NTZ  | 
  | 
各値をナノ秒スケールの   | 
TIMESTAMP_TZ  | 
  | 
各値をナノ秒スケールの   | 
GEOGRAPHY  | 
  | 
各値をGeoJSONとしてフォーマットしてから、Python   | 
次の型が出力として受け入れられます: Pandas Series または array、NumPy array、通常のPython list、および 型サポート で説明されている予想されるタイプを含む反復可能なシーケンス。Pandas Series と array とNumPy array を使用すると効率的です。コンテンツを memoryviews として公開するため、ここでのdtypeは bool、 boolean、 int16、 int32、 int64、 Int16、 Int32、 Int64、または float64 です。これは、各値を順番に読み取るのではなく、コンテンツをコピーできることを意味します。