Pythonにおける DataFrames 用ユーザー定義関数(UDAFs)の作成¶
Snowpark Python APIs を使用して、ユーザー定義の集計関数(UDAFs)を作成し、呼び出すことができます。UDAF は、1つ以上の行を入力として取得し、単一の出力行を生成します。これは複数行全体の値を操作して、合計、平均、カウント、最小値/最大値の探索、標準偏差、推定などの数学的計算に加え、一部の非数学的な演算も実行します。
Snowparkで UDAF を作成して登録するには、次を実行する必要があります。
UDAFハンドラーを実装します。
ハンドラーにはUDAFのロジックが含まれています。UDAFハンドラーは、UDAFが呼び出されたときにSnowflakeが実行時に呼び出す関数を実装する必要があります。詳細については、 ハンドラーの実装 をご参照ください。
UDAFとそのハンドラーをSnowflakeデータベースに登録します。
UDAFを登録すると、SQLから、またはSnowparkAPIを使用して呼び出すことができます。Snowpark APIを使用して、UDAFとそのハンドラーを登録できます。登録の詳細については、 UDAFの登録 をご参照ください。
また、 Pythonユーザー定義集計関数 で説明されているように、 SQL を使用して独自の UDAFs を作成することもできます。
ハンドラーの実装¶
集計関数ハンドラーのインターフェイス で説明されているように、 UDAF ハンドラークラスは、 UDAF が呼び出されたときにSnowflakeが呼び出すメソッドを実装する必要があります。UDAF をSnowpark API に登録する場合でも、 CREATE FUNCTION ステートメントを使用して SQL で作成 する場合でも、作成したクラスをハンドラーとして使用できます。
UDAF ハンドラークラスは以下のテーブルに示すメソッドを実装し、Snowflakeは実行時にこれらのメソッドを呼び出します。 このトピック内の例 をご参照ください。
メソッド |
要件 |
説明 |
---|---|---|
|
必須 |
集計の内部状態を初期化します。 |
|
必須 |
集計の内部状態を返します。
|
|
必須 |
新しい入力行に基づいて集計状態を累積します。 |
|
必須 |
2つの中間集計状態を組み合わせます。 |
|
必須 |
集計状態に基づいて最終結果を生成します。 |
UDAFの登録¶
UDAFハンドラーを実装したら、Snowpark APIを使用して、SnowflakeデータベースにUDAFを登録できます。UDAFを登録すると、UDAFが作成されて呼び出せるようになります。
スカラーUDFの場合と同様に、UDAFを名前付き関数または匿名関数として登録できます。スカラーUDFの登録に関する関連情報については、 匿名 UDF の作成 および 名前付き UDF の作成と登録 をご参照ください。UDAFを登録するときは、SnowflakeがUDAFを作成するために必要なパラメーター値を指定します。
以下の関数とメソッドを使用して関数を登録できます。
register
メソッドまたはudaf
関数を使い、ハンドラークラスの名前と引数を指定して関数を定義します。udaf
関数をハンドラークラスの@udaf
デコレーターとして使用することもできます。これらに関する参考情報については、以下をご参照ください。
register_from_file
関数を使用して、Pythonソースコードを含むPythonファイルまたはzipファイルをポイントします。関数参照については、 snowflake.snowpark.udaf.UDAFRegistration.register_from_file をご参照ください。
例¶
戻り値と1つのパラメーターを持つ UDAF を作成します。¶
以下のハンドラーの例にあるPythonコードは、 sum_int
UDAF をサポートしています。このコードは、単一の整数引数を受け取り、行をまたいで値を加算し、結果を返します。
関数を登録する¶
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
# This aggregate state is a primitive Python data type.
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value):
self._partial_sum += input_value
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
関数を呼び出す¶
次の例にあるPythonコードは、 sum_int
UDAF を DataFrame で呼び出します。
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
戻り値と2つのパラメーターを持つ UDAF を作成する¶
関数を登録する¶
次のハンドラーの例にあるPythonコードは、2つの整数引数を受け取り、行をまたいで引数値を加算して結果を返す sum_int
UDAF をサポートしています。
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value, input_value2):
self._partial_sum += input_value + input_value2
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
関数を呼び出す¶
次の例にあるPythonコードは、 sum_int
UDAF を DataFrame で呼び出します。
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())