ベクトル化されたPython UDFs

このトピックでは、ベクトル化されたPython UDFs を紹介します。

このトピックの内容:

概要

ベクトル化されたPython UDFs を使用すると、入力行のバッチを Pandas DataFrames として受け取り、結果のバッチを Pandas配列 または Series として返すPython関数を定義します。他のPython UDFs を呼び出すのと同じ方法で、ベクトル化されたPython UDFs を呼び出します。

デフォルトの行ごとの処理パターンと比較して、ベクトル化された UDFs を使用する利点は次のとおりです。

  • Pythonコードが行のバッチで効率的に動作する場合、パフォーマンスが向上する可能性があります。

  • Pandas DataFramesまたはPandas配列で動作するライブラリを呼び出す場合は、必要な変換ロジックが少なくなります。

ベクトル化されたPython UDFs を使用する場合、

  • Python UDFsを使用してクエリを作成する方法を変更する必要はありません。すべてのバッチ処理は、独自のコードではなく、UDFフレームワークによって処理されます。

  • ベクトル化されていない UDFs と同様に、ハンドラーコードのどのインスタンスが入力のどのバッチを参照するかは保証されません。

ベクトル化されたPython UDFs 入門

ベクトル化されたPython UDF を作成するには、ハンドラー関数への注釈付与でサポートされているメカニズムの1つを使用します。

vectorized デコレーターの使用

_snowflake モジュールは、Snowflake内で実行されるPython UDFsに公開されています。Pythonコードで、 _snowflake モジュールをインポートし、 vectorized デコレーターを使用して、 input パラメーターを pandas.DataFrame に設定することにより、ハンドラーがPandas DataFrameを受け取ることを期待するように指定します。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame)
def add_one_to_inputs(df):
  return df[0] + df[1] + 1
$$;
Copy

関数属性の使用

_snowflakeモジュールをインポートして vectorized デコレーターを使用する代わりに、ハンドラー関数に特別な _sf_vectorized_input 属性を設定することができます。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas

def add_one_to_inputs(df):
  return df[0] + df[1] + 1

add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
$$;
Copy

ターゲットバッチサイズの設定

Pythonハンドラー関数の呼び出しは、制限時間である180秒以内に実行する必要があり、ハンドラー関数への入力として渡される各 DataFrame には、現在最大数千行が含まれる場合があります。制限時間内にとどまるために、ハンドラー関数のターゲットバッチサイズを設定することをお勧めします。これにより、入力DataFrameごとの最大行数が制限されます。より大きな値を設定しても、Snowflakeが指定された行数のバッチをエンコードすることを保証するものではないことに注意してください。 vectorized デコレーターまたは関数の属性を使用して、ターゲットバッチサイズを設定できます。

注釈

max_batch_size の使用は、 UDF が1つのバッチで処理できる行数を制限するためのメカニズムとしてのみ使用されます。たとえば、 UDF が一度に最大100行しか処理できないように記述されている場合は、 max_batch_size を100に設定する必要があります。 max_batch_size の設定は、任意の大きなバッチサイズを指定するメカニズムとして使用するためのものではありません。UDF が任意のサイズのバッチを処理する場合は、このパラメーターを未設定のままにしておくことをお勧めします。

vectorized デコレーターの使用

vectorized デコレーターを使用してターゲットバッチサイズを設定するには、 max_batch_size という名前の引数に正の整数値を渡します。

例として、このステートメントは、各Dataframeを最大100行に制限するベクトル化されたPython UDF を作成します。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame, max_batch_size=100)
def add_one_to_inputs(df):
  return df[0] + df[1] + 1
$$;
Copy

関数属性の使用

関数属性を使用してターゲットバッチサイズを設定するには、ハンドラー関数の _sf_max_batch_size 属性に正の整数値を設定します。

例として、このステートメントは、各 DataFrame を最大100行に制限するベクトル化されたPython UDF を作成します。

create function add_one_to_inputs(x number(10, 0), y number(10, 0))
returns number(10, 0)
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_one_to_inputs'
as $$
import pandas

def add_one_to_inputs(df):
  return df[0] + df[1] + 1

add_one_to_inputs._sf_vectorized_input = pandas.DataFrame
add_one_to_inputs._sf_max_batch_size = 100
$$;
Copy

DataFrameエンコーディング

UDFへの引数のバッチは、入力パンダPandas DataFramesの配列としてエンコードされ、各DataFrameの行数は異なる場合があります。詳細については、 ターゲットバッチサイズの設定 をご参照ください。引数は、インデックスによってDataFrameでアクセスできます。つまり、最初の引数のインデックスは0、2番目の引数のインデックスは1というようになります。UDFハンドラーが返すPandas配列またはシリーズは、入力DataFrameと同じ長さである必要があります。

説明のために、ベクトル化されたPython UDF を次のように定義するとします。

create or replace function add_inputs(x int, y float)
returns float
language python
runtime_version = 3.8
packages = ('pandas')
handler = 'add_inputs'
as $$
import pandas
from _snowflake import vectorized

@vectorized(input=pandas.DataFrame)
def add_inputs(df):
  return df[0] + df[1]
$$;
Copy

このUDFは、 df[0] を使用して最初の引数のPandas配列にアクセスし、 df[1] を使用して2番目の引数にアクセスします。 df[0] + df[1] は、2つの配列からの対応する要素のペアワイズ合計を持つPandas配列になります。UDFを作成した後、いくつかの入力行を使用して呼び出すことができます。

select add_inputs(x, y)
from (
  select 1 as x, 3.14::float as y union all
  select 2, 1.59 union all
  select 3, -0.5
);
+------------------+
| ADD_INPUTS(X, Y) |
|------------------|
|             4.14 |
|             3.59 |
|             2.5  |
+------------------+
Copy

ここで、 add_inputs Python関数は、次のPythonコードで作成されたものに類似したDataFrameを受け取ります。

>>> import pandas
>>> df = pandas.DataFrame({0: pandas.array([1, 2, 3]), 1: pandas.array([3.14, 1.59, -0.5])})
>>> df
   0     1
0  1  3.14
1  2  1.59
2  3 -0.50
Copy

ハンドラー関数の行 return df[0] + df[1] は、次のPythonコードのような配列になります。

>>> df[0] + df[1]
0    4.14
1    3.59
2    2.50
dtype: float64
Copy

型サポート

ベクトル化された UDFs は、引数と戻り値に対して次の SQL 型 をサポートします。この表は、各SQL引数が特定の dtype のPandas配列としてどのようにエンコードされるかを反映しています。

SQL 型

パンダdtype

メモ

NUMBER

すべてが64ビットまたはより小さい整数型に適合するスケール0を持つ NUMBER 引数のための Int16Int32、 または Int64 引数がnull許容でない場合は、代わりに int16int32、または int64 が使用されます。(UDTFs については、 Int16Int32、または Int64 が常に使用されます。) . . object 0以外のスケールの引数、または64ビット整数内に収まらない引数の場合、配列要素は decimal.Decimal 値としてエンコードされます。 . . 16ビットのdtypeを確保するには、 NUMBER の最大精度4を使用します。32ビットのdtypeを確保するには、 NUMBER の最大精度9を使用します。64ビットのdtypeを確保するには、 NUMBER の最大精度18を使用します。

UDFへの入力引数がnull許容ではないと解釈されるようにするには、 NOT NULL 列制約を使用して作成されたテーブルから列を渡すか、引数に IFNULL などの関数を使用します。

FLOAT

float64

NULL値はNaN値としてエンコードされます。出力では、NaN値はNULLsとして解釈されます。

BOOLEAN

null許容引数の場合は boolean またはnull許容でない引数の場合は bool

VARCHAR

string

Snowflake SQLとPandasはどちらも、UTF-8エンコーディングを使用して文字列を表します。

BINARY

bytes

DATE

datetime64

各値は、時間コンポーネントなしで datetime64 としてエンコードされます。NULL値は numpy.timedelta('NaT') としてエンコードされます。

VARIANT

object . . 各値は dictlistintfloatstr、または bool としてエンコードされます。

各バリアント行は、引数の場合は動的にPython型に変換され、戻り値の場合はその逆に変換されます。次のタイプは、ネイティブPython型ではなく文字列に変換されます: decimalbinarydatetimetimestamp_ltztimestamp_ntztimestamp_tz

OBJECT

object . . 各要素はdictとしてエンコードされます。

ARRAY

object . . 各要素はリストとしてエンコードされます。

TIME

timedelta64

各値は、真夜中からのオフセットとしてエンコードされます。NULL値は numpy.timedelta64('NaT') としてエンコードされます。戻り型として使用する場合、出力の要素は [00:00:00, 23:59:59.999999999] の範囲の numpy.timedelta64 または datetime.time の値になります。

TIMESTAMP_LTZ

datetime64

ローカルタイムゾーンを使用して、各値をUTC Unixエポックを基準にしたナノ秒スケールの numpy.datetime64 としてエンコードします。NULL値は numpy.datetime64('NaT') としてエンコードされます。戻り値の型として使用する場合、出力の要素は numpy.datetime64 またはタイムゾーンのナイーブな datetime.datetime または pandas.Timestamp の値になります。

TIMESTAMP_NTZ

datetime64

各値をナノ秒スケールの numpy.datetime64 としてエンコードします。NULL値は numpy.datetime64('NaT') としてエンコードされます。戻り値の型として使用する場合、出力の要素は numpy.datetime64 またはタイムゾーンのナイーブな datetime.datetime または pandas.Timestamp の値になります。

TIMESTAMP_TZ

object

各値をナノ秒スケールの pandas.Timestamp としてエンコードします。NULL値は pandas.NA としてエンコードされます。戻り値の型として使用する場合、出力の要素はタイムゾーン対応の datetime.datetime または pandas.Timestamp 値である可能性があります。

GEOGRAPHY

object

各値をGeoJSONとしてフォーマットしてから、Python dict に変換します。

次の型が出力として受け入れられます: Pandas Series または array、NumPy array、通常のPython list、および 型サポート で説明されている予想されるタイプを含む反復可能なシーケンス。Pandas Seriesarray とNumPy array を使用すると効率的です。コンテンツを memoryviews として公開するため、ここでのdtypeは boolbooleanint16int32int64Int16Int32Int64、または float64 です。これは、各値を順番に読み取るのではなく、コンテンツをコピーできることを意味します。