Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs) für DataFrames in Python

Die Snowpark-API bietet Methoden, mit denen Sie eine benutzerdefinierte Tabellenfunktion mit einem in Python geschriebenen Handler erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Sie können eine benutzerdefinierte Tabellenfunktion (UDTF) mit dem Snowpark-API erstellen.

Dies geschieht auf ähnliche Weise wie das Erstellen einer skalaren benutzerdefinierten Funktion (UDF) mit der API, wie unter Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python beschrieben. Zu den wichtigsten Unterschieden gehören die Anforderungen an den UDF-Handler und die bei der Registrierung des UDTF erforderlichen Parameterwerte.

So erstellen und registrieren Sie eine UDTF in Snowpark:

  • Implementieren Sie einen UDTF-Handler.

    Der Handler enthält die Logik der UDTF. Ein UDTF-Handler muss Funktionen implementieren, die Snowflake zur Laufzeit aufruft, wenn die UDTF aufgerufen wird. Weitere Informationen dazu finden Sie unter Implementieren eines UDTF-Handlers.

  • Registrieren Sie die UDTF und deren Handler in der Snowflake-Datenbank.

    Sie können die Snowpark-API verwenden, um die UDTF und deren Handler zu registrieren. Nachdem die UDTF registriert ist, kann diese von SQL oder über den Snowpark-API aufgerufen werden. Weitere Informationen zur Registrierung finden Sie unter Registrieren einer UDTF.

Weitere Informationen zum Aufrufen von UDTFs finden Sie unter Aufrufen benutzerdefinierter Tabellenfunktionen (UDTFs).

Implementieren eines UDTF-Handlers

Wie unter Schreiben von UDTF in Python ausführlich beschrieben, muss eine UDTF-Handler-Klasse Methoden implementieren, die Snowflake aufruft, wenn die UDTF aufgerufen wird. Sie können die von Ihnen geschriebene Klasse als Handler verwenden, unabhängig davon, ob Sie die UDTF mit der Snowpark-API registrieren oder ob Sie diese mit SQL unter Verwendung der CREATE FUNCTION-Anweisung erstellen.

Die Methoden einer Handler-Klasse sind für die Verarbeitung der von der UDTF empfangenen Zeilen und Partitionen vorgesehen.

Eine UDTF-Handler-Klasse implementiert Folgendes, das Snowflake zur Laufzeit aufruft:

  • Eine __init__-Methode. Optional. Wird aufgerufen, um die zustandsabhängige Verarbeitung von Eingabepartitionen zu initialisieren.

  • Eine process-Methode. Erforderlich. Wird für jede Eingabezeile aufgerufen. Die Methode gibt einen tabellarischen Wert in Form von Tupeln zurück.

  • Eine end_partition-Methode. Optional. Wird aufgerufen, um die Verarbeitung der Eingabepartitionen abzuschließen.

    Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn end_partition zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.

Weitere Details und Beispiele zu Handlern finden Sie unter Schreiben von UDTF in Python.

Registrieren einer UDTF

Sobald Sie einen UDTF-Handler implementiert haben, können Sie die Snowpark-API verwenden, um die UDTF in der Snowflake-Datenbank zu registrieren. Durch das Registrieren der UDTF wird die UDTF erstellt, sodass sie aufgerufen werden kann.

Sie können die UDTF als benannte oder anonyme Funktion registrieren, wie Sie es für eine skalare UDF tun können. Weitere Informationen zum Registrieren einer skalaren UDF finden Sie unter Erstellen einer anonymen UDF und Erstellen und Registrieren einer benannten UDF.

Wenn Sie eine UDTF registrieren, geben Sie die Parameterwerte an, die Snowflake zum Erstellen der UDTF benötigt. (Viele dieser Parameter entsprechen funktional den Klauseln der CREATE FUNCTION-Anweisung in SQL. Weitere Informationen dazu finden Sie unter CREATE FUNCTION.)

Die meisten dieser Parameter sind die gleichen, die Sie auch beim Erstellen einer skalaren UDF angeben (weitere Informationen dazu finden Sie unter Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python). Die Hauptunterschiede ergeben sich aus der Tatsache, dass eine UDTF einen Tabellenwert zurückgibt, und der Tatsache, dass der Handler eine Klasse und keine Funktion ist. Eine vollständige Liste der Parameter finden Sie in der unten verlinkten Dokumentation für die APIs.

Um eine UDTF bei Snowpark zu registrieren, verwenden Sie eine der folgenden Möglichkeiten, wobei Sie die Parameterwerte angeben, die für die Erstellung der UDTF in der Datenbank erforderlich sind. Informationen zur Unterscheidung dieser Optionen finden Sie unter UDFRegistration, wo ähnliche Optionen für die Registrierung einer skalaren UDF beschrieben werden.

Definieren der Eingabetypen und des Ausgabeschemas einer UDTF

Wenn Sie eine UDTF registrieren, geben Sie Details zu den Parametern und zum Ausgabewert der Funktion an. Sie tun dies, damit die Funktion selbst Typen deklariert, die genau denjenigen des zugrunde liegenden Handlers der Funktion entsprechen.

Beispiele dazu finden Sie unter Beispiele (unter diesem Thema) und in der Referenz snowflake.snowpark.udtf.UDTFRegistration.

Beim Registrieren der UDTF geben Sie Folgendes an:

  • Typen der Eingabeparameter als Wert des input_types-Parameters der registrierenden Funktion. Der Parameter input_types ist optional, wenn Sie in der Deklaration der process-Methode Informationen zum Typ bereitstellen.

    Geben Sie diesen Wert als eine Liste von Typen auf Grundlage von snowflake.snowpark.types.DataType an. So könnten Sie beispielsweise Folgendes angeben: input_types=[StringType(), IntegerType()]

  • Schema der tabellarischen Ausgabe als Wert des output_schema-Parameters der registrierenden Funktion.

    output_schema kann einen der folgenden Werte aufweisen:

    • Eine Liste der Namen der Spalten im Rückgabewert der UDTF.

      Die Liste enthält nur Spaltennamen, sodass Sie in der Deklaration der process-Methode auch Informationen zum Typ bereitstellen müssen.

    • Ein Objekt StructType, das die Spaltennamen und Typen der Ausgabetabelle repräsentiert.

      Der Code im folgenden Beispiel weist einer output-Variablen ein Schema als Wert zu und verwendet die Variable dann bei der Registrierung der UDTF.

      >>> from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
      >>> from snowflake.snowpark.functions import udtf, table_function
      >>> schema = StructType([
      ...     StructField("symbol", StringType())
      ...     StructField("cost", IntegerType()),
      ... ])
      >>> @udtf(output_schema=schema,input_types=[StringType(), IntegerType(), FloatType()],stage_location="straut_udf",is_permanent=True,name="test_udtf",replace=True)
      ... class StockSale:
      ...     def process(self, symbol, quantity, price):
      ...         cost = quantity * price
      ...         yield (symbol, cost)
      
      Copy

Beispiele

Im Folgenden finden Sie eine kurze Liste von Beispielen. Weitere Beispiele finden Sie unter snowflake.snowpark.udtf.UDTFRegistration.

Registrieren einer UDTF mit der Funktion udtf

Registrieren Sie die Funktion.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
Copy

Rufen Sie die Funktion auf.

>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

Registrieren einer UDTF mit der Funktion register

Registrieren Sie die Funktion.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self):
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
Copy

Rufen Sie die Funktion auf.

>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy

Registrieren einer UDTF mit der Funktion register_from_file

Registrieren Sie die Funktion.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
Copy

Rufen Sie die Funktion auf.

>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy