Erstellen von benutzerdefinierten Aggregatfunktionen (UDAFs) für DataFrames in Python

Sie können Snowpark Python-APIs verwenden, um benutzerdefinierte Aggregatfunktionen (UDAFs) zu erstellen und aufzurufen. Eine UDAF nimmt eine oder mehrere Zeilen als Eingabe entgegen und erzeugt eine einzelne Zeile als Ausgabe. Sie operiert mit Werten über mehrere Zeilen hinweg, um mathematische Berechnungen wie Summe, Durchschnitt, Zählung, Ermitteln von Minimum- oder Maximumwert, Standardabweichung und Schätzung sowie andere nicht mathematische Operationen auszuführen.

So erstellen und registrieren Sie eine UDAF in Snowpark:

  • Implementieren Sie einen UDAF-Handler.

    Der Handler enthält die Logik der UDAF. Ein UDAF-Handler muss Funktionen implementieren, die Snowflake zur Laufzeit aufruft, wenn die UDAF aufgerufen wird. Weitere Informationen dazu finden Sie unter Implementieren eines Handlers.

  • Registrieren Sie die UDAF und deren Handler in der Snowflake-Datenbank.

    Nachdem die UDAF registriert ist, kann diese von SQL oder über den Snowpark-API aufgerufen werden. Sie können die Snowpark-API verwenden, um die UDAF und deren Handler zu registrieren. Weitere Informationen zur Registrierung finden Sie unter Registrieren einer UDAF.

Sie können auch Ihre eigene UDAFs mit SQL erstellen, wie in Benutzerdefinierte Python-Aggregatfunktionen beschrieben.

Implementieren eines Handlers

Wie unter Schnittstelle für Aggregatfunktionshandler beschrieben, muss eine UDAF-Handler-Klasse Methoden implementieren, die Snowflake aufruft, wenn die UDAF aufgerufen wird. Sie können die von Ihnen geschriebene Klasse als Handler verwenden, unabhängig davon, ob Sie die UDAF mit der Snowpark-API registrieren oder ob Sie diese mit SQL unter Verwendung der CREATE FUNCTION-Anweisung erstellen.

Ihre UDAF-Handler-Klasse implementiert die in der folgenden Tabelle aufgeführten Methoden, die Snowflake zur Laufzeit aufruft. Siehe Beispiele unter diesem Thema.

Methode

Anforderung

Beschreibung

__init__

Erforderlich

Initialisiert den internen Status eines Aggregats.

aggregate_state

Erforderlich

Gibt den internen Status eines Aggregats zurück.

  • Die Methode muss einen @property-Decorator haben.

  • Ein Aggregatstatusobjekt kann ein beliebiger Python-Datentyp sein, der von der Python Pickle-Bibliothek serialisiert werden kann.

  • Für einfache Aggregatstatus verwenden Sie einen primitiven Python-Datentyp. Für komplexere Aggregatstatus verwenden Sie Python-Datenklassen.

accumulate

Erforderlich

Akkumuliert den Status des Aggregats auf der Grundlage der neuen Eingabezeile.

merge

Erforderlich

Kombiniert zwei Aggregatzwischenstatus.

finish

Erforderlich

Erzeugt das Endergebnis auf der Grundlage des aggregierten Status.

Registrieren einer UDAF

Sobald Sie einen UDAF-Handler implementiert haben, können Sie die Snowpark-API verwenden, um die UDAF in der Snowflake-Datenbank zu registrieren. Durch das Registrieren der UDAF wird die UDAF erstellt, sodass sie aufgerufen werden kann.

Sie können die UDAF als benannte oder anonyme Funktion registrieren, wie Sie es für eine skalare UDF tun können. Weitere Informationen zum Registrieren einer skalaren UDF finden Sie unter Erstellen einer anonymen UDF und Erstellen und Registrieren einer benannten UDF. Wenn Sie eine UDAF registrieren, geben Sie die Parameterwerte an, die Snowflake zum Erstellen der UDAF benötigt.

Sie können die Funktion mithilfe der folgenden Funktionen und Methoden registrieren:

Beispiele

UDAF mit einem Rückgabewert und einem einzelnen Parameter erstellen

Der Python-Code im folgenden Handler-Beispiel unterstützt eine UDAF sum_int, die ein einzelnes Integer-Argument erhält, den Wert zeilenübergreifend addiert und das Ergebnis zurückgibt.

Funktion registrieren

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
  def __init__(self):
    # This aggregate state is a primitive Python data type.
    self._partial_sum = 0

  @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value):
    self._partial_sum += input_value

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Funktion aufrufen

Der Python-Code im folgenden Beispiel ruft die UDAF sum_int mit einem DataFrame auf.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Copy

UDAF mit einem Rückgabewert und zwei Parametern erstellen

Funktion registrieren

Der Python-Code im folgenden Handler-Beispiel unterstützt eine UDAF sum_int, die zwei Integer-Argumente erhält, die Argumentwerte zeilenübergreifend addiert und das Ergebnis zurückgibt.

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
  class PythonSumUDAF:
    def __init__(self):
      self._partial_sum = 0

    @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value, input_value2):
    self._partial_sum += input_value + input_value2

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

Funktion aufrufen

Der Python-Code im folgenden Beispiel ruft die UDAF sum_int mit einem DataFrame auf.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())
Copy