Erstellen von benutzerdefinierten Aggregatfunktionen (UDAFs) für DataFrames in Python¶
Sie können Snowpark Python-APIs verwenden, um benutzerdefinierte Aggregatfunktionen (UDAFs) zu erstellen und aufzurufen. Eine UDAF nimmt eine oder mehrere Zeilen als Eingabe entgegen und erzeugt eine einzelne Zeile als Ausgabe. Sie operiert mit Werten über mehrere Zeilen hinweg, um mathematische Berechnungen wie Summe, Durchschnitt, Zählung, Ermitteln von Minimum- oder Maximumwert, Standardabweichung und Schätzung sowie andere nicht mathematische Operationen auszuführen.
So erstellen und registrieren Sie eine UDAF in Snowpark:
Implementieren Sie einen UDAF-Handler.
Der Handler enthält die Logik der UDAF. Ein UDAF-Handler muss Funktionen implementieren, die Snowflake zur Laufzeit aufruft, wenn die UDAF aufgerufen wird. Weitere Informationen dazu finden Sie unter Implementieren eines Handlers.
Registrieren Sie die UDAF und deren Handler in der Snowflake-Datenbank.
Nachdem die UDAF registriert ist, kann diese von SQL oder über den Snowpark-API aufgerufen werden. Sie können die Snowpark-API verwenden, um die UDAF und deren Handler zu registrieren. Weitere Informationen zur Registrierung finden Sie unter Registrieren einer UDAF.
Sie können auch Ihre eigene UDAFs mit SQL erstellen, wie in Benutzerdefinierte Python-Aggregatfunktionen beschrieben.
Implementieren eines Handlers¶
Wie unter Schnittstelle für Aggregatfunktionshandler beschrieben, muss eine UDAF-Handler-Klasse Methoden implementieren, die Snowflake aufruft, wenn die UDAF aufgerufen wird. Sie können die von Ihnen geschriebene Klasse als Handler verwenden, unabhängig davon, ob Sie die UDAF mit der Snowpark-API registrieren oder ob Sie diese mit SQL unter Verwendung der CREATE FUNCTION-Anweisung erstellen.
Ihre UDAF-Handler-Klasse implementiert die in der folgenden Tabelle aufgeführten Methoden, die Snowflake zur Laufzeit aufruft. Siehe Beispiele unter diesem Thema.
Methode |
Anforderung |
Beschreibung |
---|---|---|
|
Erforderlich |
Initialisiert den internen Status eines Aggregats. |
|
Erforderlich |
Gibt den internen Status eines Aggregats zurück.
|
|
Erforderlich |
Akkumuliert den Status des Aggregats auf der Grundlage der neuen Eingabezeile. |
|
Erforderlich |
Kombiniert zwei Aggregatzwischenstatus. |
|
Erforderlich |
Erzeugt das Endergebnis auf der Grundlage des aggregierten Status. |
Registrieren einer UDAF¶
Sobald Sie einen UDAF-Handler implementiert haben, können Sie die Snowpark-API verwenden, um die UDAF in der Snowflake-Datenbank zu registrieren. Durch das Registrieren der UDAF wird die UDAF erstellt, sodass sie aufgerufen werden kann.
Sie können die UDAF als benannte oder anonyme Funktion registrieren, wie Sie es für eine skalare UDF tun können. Weitere Informationen zum Registrieren einer skalaren UDF finden Sie unter Erstellen einer anonymen UDF und Erstellen und Registrieren einer benannten UDF. Wenn Sie eine UDAF registrieren, geben Sie die Parameterwerte an, die Snowflake zum Erstellen der UDAF benötigt.
Sie können die Funktion mithilfe der folgenden Funktionen und Methoden registrieren:
Verwenden Sie die Methode
register
oder die Funktionudaf
und geben Sie den Namen Ihrer Handler-Klasse zusammen mit Argumenten an, um die Funktion zu definieren. Sie können die Funktionudaf
auch als@udaf
-Decorator für die Handler-Klasse verwenden.Weitere Informationen dazu finden Sie unter:
Verwenden Sie die Funktion
register_from_file
, um auf eine Python-Datei oder eine ZIP-Datei mit Python-Quellcode zu verweisen.Die Funktionsreferenz finden Sie unter snowflake.snowpark.udtf.UDAFRegistration.register_from_file.
Beispiele¶
UDAF mit einem Rückgabewert und einem einzelnen Parameter erstellen¶
Der Python-Code im folgenden Handler-Beispiel unterstützt eine UDAF sum_int
, die ein einzelnes Integer-Argument erhält, den Wert zeilenübergreifend addiert und das Ergebnis zurückgibt.
Funktion registrieren¶
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
# This aggregate state is a primitive Python data type.
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value):
self._partial_sum += input_value
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Funktion aufrufen¶
Der Python-Code im folgenden Beispiel ruft die UDAF sum_int
mit einem DataFrame auf.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
UDAF mit einem Rückgabewert und zwei Parametern erstellen¶
Funktion registrieren¶
Der Python-Code im folgenden Handler-Beispiel unterstützt eine UDAF sum_int
, die zwei Integer-Argumente erhält, die Argumentwerte zeilenübergreifend addiert und das Ergebnis zurückgibt.
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value, input_value2):
self._partial_sum += input_value + input_value2
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Funktion aufrufen¶
Der Python-Code im folgenden Beispiel ruft die UDAF sum_int
mit einem DataFrame auf.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())