Pythonにおける DataFrames 用ユーザー定義関数(UDAFs)の作成

Snowpark Python APIs を使用して、ユーザー定義の集計関数(UDAFs)を作成し、呼び出すことができます。UDAF は、1つ以上の行を入力として取得し、単一の出力行を生成します。これは複数行全体の値を操作して、合計、平均、カウント、最小値/最大値の探索、標準偏差、推定などの数学的計算に加え、一部の非数学的な演算も実行します。

Snowparkで UDAF を作成して登録するには、次を実行する必要があります。

  • UDAFハンドラーを実装します。

    ハンドラーにはUDAFのロジックが含まれています。UDAFハンドラーは、UDAFが呼び出されたときにSnowflakeが実行時に呼び出す関数を実装する必要があります。詳細については、 ハンドラーの実装 をご参照ください。

  • UDAFとそのハンドラーをSnowflakeデータベースに登録します。

    UDAFを登録すると、SQLから、またはSnowparkAPIを使用して呼び出すことができます。Snowpark APIを使用して、UDAFとそのハンドラーを登録できます。登録の詳細については、 UDAFの登録 をご参照ください。

また、 Pythonユーザー定義集計関数 で説明されているように、 SQL を使用して独自の UDAFs を作成することもできます。

ハンドラーの実装

集計関数ハンドラーのインターフェイス で説明されているように、 UDAF ハンドラークラスは、 UDAF が呼び出されたときにSnowflakeが呼び出すメソッドを実装する必要があります。UDAF をSnowpark API に登録する場合でも、 CREATE FUNCTION ステートメントを使用して SQL で作成 する場合でも、作成したクラスをハンドラーとして使用できます。

UDAF ハンドラークラスは以下のテーブルに示すメソッドを実装し、Snowflakeは実行時にこれらのメソッドを呼び出します。 このトピック内の例 をご参照ください。

メソッド

要件

説明

__init__

必須

集計の内部状態を初期化します。

aggregate_state

必須

集計の内部状態を返します。

  • メソッドには、 @property デコレーター が必要です。

  • 集計状態オブジェクトは、 Python pickleライブラリ でシリアル化が可能な任意のPythonデータ型にすることができます。

  • 単純な集計状態の場合は、Pythonのプリミティブデータ型を使用します。より複雑な集計状態の場合は、 Python データクラス を使用します。

accumulate

必須

新しい入力行に基づいて集計状態を累積します。

merge

必須

2つの中間集計状態を組み合わせます。

finish

必須

集計状態に基づいて最終結果を生成します。

UDAFの登録

UDAFハンドラーを実装したら、Snowpark APIを使用して、SnowflakeデータベースにUDAFを登録できます。UDAFを登録すると、UDAFが作成されて呼び出せるようになります。

スカラーUDFの場合と同様に、UDAFを名前付き関数または匿名関数として登録できます。スカラーUDFの登録に関する関連情報については、 匿名 UDF の作成 および 名前付き UDF の作成と登録 をご参照ください。UDAFを登録するときは、SnowflakeがUDAFを作成するために必要なパラメーター値を指定します。

以下の関数とメソッドを使用して関数を登録できます。

戻り値と1つのパラメーターを持つ UDAF を作成します。

以下のハンドラーの例にあるPythonコードは、 sum_int UDAF をサポートしています。このコードは、単一の整数引数を受け取り、行をまたいで値を加算し、結果を返します。

関数を登録する

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
  def __init__(self):
    # This aggregate state is a primitive Python data type.
    self._partial_sum = 0

  @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value):
    self._partial_sum += input_value

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

関数を呼び出す

次の例にあるPythonコードは、 sum_int UDAF を DataFrame で呼び出します。

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Copy

戻り値と2つのパラメーターを持つ UDAF を作成する

関数を登録する

次のハンドラーの例にあるPythonコードは、2つの整数引数を受け取り、行をまたいで引数値を加算して結果を返す sum_int UDAF をサポートしています。

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
  class PythonSumUDAF:
    def __init__(self):
      self._partial_sum = 0

    @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value, input_value2):
    self._partial_sum += input_value + input_value2

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

関数を呼び出す

次の例にあるPythonコードは、 sum_int UDAF を DataFrame で呼び出します。

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())
Copy