Schreiben von UDTF in Python¶
Unter diesem Thema:
Sie können eine benutzerdefinierte Tabellenfunktion (UDTF) als Handler in Python implementieren. Der Handler-Code wird ausgeführt, wenn die UDTF aufgerufen wird. Unter diesem Thema wird beschrieben, wie ein Handler in Python implementiert und die UDTF erstellt wird.
Eine UDTF ist eine benutzerdefinierte Funktion (UDF), die tabellarische Ergebnisse liefert. Weitere Informationen zu in Python implementierten UDF-Handlern finden Sie unter Erstellen von Python-UDFs. Allgemeine Informationen zu UDFs finden Sie unter Übersicht zu benutzerdefinierten Funktionen.
In einem Handler für eine UDTF können Sie Eingabezeilen verarbeiten (siehe Verarbeiten von Zeilen unter diesem Thema). Sie können auch Verarbeitungslogik verwenden, die für jede Eingabepartition ausgeführt wird (siehe Verarbeiten von Partitionen unter diesem Thema).
Wenn Sie eine Python-UDTF erstellen, gehen Sie wie folgt vor:
Implementieren Sie eine Klasse mit Methoden, die Snowflake aufruft, wenn die UDTF aufgerufen wird.
Weitere Details dazu finden Sie unter Implementieren eines Handlers (unter diesem Thema).
Erstellen Sie die UDTF mit dem SQL-Befehl CREATE FUNCTION, und geben Sie Ihre Klasse als Handler an. Wenn Sie die UDTF erstellen, geben Sie Folgendes an:
Datentypen der UDTF-Eingabeparameter
Datentypen der von der UDTF zurückgegebenen Spalten
Code, der als Handler ausgeführt wird, wenn die UDTF aufgerufen wird
Sprache, in der der Handler implementiert ist
Weitere Informationen zur Syntax finden Sie unter Erstellen der UDTF mit CREATE FUNCTION (unter diesem Thema).
Sie können eine UDF oder UDTF aufrufen, wie unter Aufrufen einer UDF beschrieben.
Bemerkung
Tabellarische Funktionen (UDTFs) haben einen Grenzwert von 500 Eingabeargumenten und 500 Ausgabespalten.
Snowflake unterstützt derzeit das Schreiben von UDTFs mit den folgenden Python-Versionen:
3.8
3.9
3.10
3.11
Setzen Sie in Ihrer CREATE FUNCTION-Anweisung runtime_version
auf die gewünschte Version.
Implementieren eines Handlers¶
Sie implementieren eine Handler-Klasse, um UDTF-Argumentwerte zu tabellarischen Ergebnissen zu verarbeiten und partitionierte Eingaben zu behandeln. Ein Beispiel für eine Handler-Klasse finden Sie unter Beispiel für eine Handler-Klasse (unter diesem Thema).
Wenn Sie die UDTF mit CREATE FUNCTION erstellen, geben Sie diese Klasse als Handler für die UDTF an. Weitere Informationen zu zur Erstellung der Funktion mit SQL finden Sie unter Erstellen der UDTF mit CREATE FUNCTION (in diesem Thema).
Eine Handler-Klasse implementiert Methoden, die Snowflake aufruft, wenn die UDTF aufgerufen wird. Diese Klasse enthält die Verarbeitungslogik der UDTF.
Methode |
Anforderung |
Beschreibung |
---|---|---|
|
Optional |
Initialisiert den Zustand für die zustandsabhängige Verarbeitung von Eingabepartitionen. Weitere Informationen dazu finden Sie unter Initialisierung des Handlers (unter diesem Thema). |
|
Erforderlich |
Verarbeitet jede Eingabezeile und gibt einen tabellarischen Wert als Tupel zurück. Snowflake ruft diese Methode auf und übergibt Eingaben aus den Argumenten der UDTF. Weitere Informationen dazu finden Sie unter Definieren einer process-Methode (unter diesem Thema). |
|
Optional |
Schließt die Verarbeitung der Eingabepartitionen ab und gibt einen tabellarischen Wert in Form von Tupeln zurück. Weitere Informationen dazu finden Sie unter Abschließen der Partitionsverarbeitung (unter diesem Thema). |
Beachten Sie, dass das Auslösen einer Ausnahme durch eine beliebige Methode der Handler-Klasse zum Abbruch der Verarbeitung führt. Die Abfrage, mit der die UDTF aufgerufen wurde, schlägt mit einer Fehlermeldung fehl.
Bemerkung
Wenn Ihr Code die hier beschriebenen Anforderungen nicht erfüllt, kann die Erstellung oder Ausführung der UDTF fehlschlagen. Snowflake erkennt mögliche Verstöße beim Ausführen der CREATE FUNCTION-Anweisung.
Beispiel für eine Handler-Klasse¶
Der Code im folgenden Beispiel erstellt einen UDTF, deren Handler-Klasse Zeilen in einer Partition verarbeitet. Die Methode process
verarbeitet jede Eingabezeile und gibt eine Zeile mit den Gesamtkosten eines Aktienverkaufs zurück. Nach der Verarbeitung der Zeilen in der Partition gibt der Prozess (mit ihrer Methode end_partition
) die Gesamtsumme aller in der Partition enthaltenen Verkäufe zurück.
create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol
, quantity
und price
der Tabelle stocks_table
übergeben werden. Weitere Informationen zum Aufrufen einer UDTF finden Sie unter Aufrufen einer UDF.
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Initialisierung des Handlers¶
Sie können in Ihrer Handler-Klasse optional eine __init__
-Methode implementieren, die Snowflake aufruft, bevor der Handler mit der Verarbeitung von Zeilen begonnen hat. Sie können diese Methode zum Beispiel verwenden, um einen partitionsbezogenen Zustand für den Handler einzurichten. Ihre __init__
-Methode erzeugt möglicherweise keine Ausgabezeilen.
Die Signatur der Methode muss die folgende Form haben:
def __init__(self):
Sie könnte beispielweise wie folgt vorgehen:
Initialisieren des Zustands einer Partition, dann Verwendung dieses Zustands in den Methoden
process
undend_partition
.Ausführen einer langwierigen Initialisierung, die nur einmal pro Partition und nicht einmal pro Zeile erfolgen muss.
Bemerkung
Sie können auch die Verarbeitungslogik einmal vor Beginn der Partitionsverarbeitung ausführen, indem Sie diesen Code außerhalb der Handler-Klasse einfügen, beispielsweise vor der Klassendeklaration.
Weitere Informationen zur Verarbeitung von Partitionen finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).
Bei Verwendung einer __init__
-Methode müssen Sie daran denken, dass für __init__
Folgendes gilt:
Kann nur
self
als Argument annehmen.Kann keine Ausgabezeilen erzeugen. Verwenden Sie dafür Ihre
process
-Methodenimplementierung.Wird für jede Partition einmal aufgerufen, und zwar bevor die
process
-Methode aufgerufen wird.
Verarbeiten von Zeilen¶
Implementieren Sie eine process
-Methode, die Snowflake für jede Eingabezeile aufruft.
Definieren einer process
-Methode¶
Definieren Sie eine process
-Methode, die als Werte die aus SQL-Typen konvertierten UDTF-Argumente erhält und Daten zurückgibt, die Snowflake zum Erstellen des tabellarischen Rückgabewerts der UDTF verwendet.
Die Signatur der Methode muss die folgende Form haben:
def process(self, *args):
Für Ihre process
-Methode muss Folgendes gelten:
Hat einen
self
-Parameter.Deklariert Methodenparameter, die den UDTF-Parametern entsprechen.
Die Namen der Methodenparameter müssen nicht mit den Namen der UDTF-Parameter übereinstimmen, aber die Methodenparameter müssen in der gleichen Reihenfolge deklariert werden, wie die UDTF-Parameter deklariert sind.
Wenn Sie UDTF-Argumentwerte an Ihre Methode übergeben, konvertiert Snowflake die SQL-Typen der Werte in die Python-Typen, die Sie in der Methode verwenden. Weitere Informationen zur Zuordnung von SQL-Datentypen zu Python-Datentypen in Snowflake finden Sie unter Zuordnung von Datentypen zwischen SQL und Python.
Gibt ein oder mehrere Tupel aus (oder ein iterierendes Objekt mit Tupeln), wobei die Reihenfolge der Tupel der Reihenfolge der UDTF-Spalten für die Rückgabewerte entspricht.
Die Tupel-Elemente müssen in der gleichen Reihenfolge erscheinen, wie die UDTF-Spalten für die Rückgabewerte deklariert sind. Weitere Informationen dazu finden Sie unter Zurückgeben eines Wertes (unter diesem Thema).
Snowflake konvertiert dann die Python-Datentypen der Werte in die SQL-Datentypen, die von der UDTF-Deklaration benötigt werden. Weitere Informationen zur Zuordnung von SQL-Datentypen zu Python-Datentypen in Snowflake finden Sie unter Zuordnung von Datentypen zwischen SQL und Python.
Wenn eine Methode in der Handler-Klasse eine Ausnahme auslöst, wird die Verarbeitung angehalten. Die Abfrage, mit der die UDTF aufgerufen wurde, schlägt fehl und gibt eine Fehlermeldung aus. Wenn die process
-Methode den Wert None
zurückgibt, wird die Verarbeitung angehalten. (Die end_partition
-Methode wird auch dann aufgerufen, wenn die process
-Methode den Wert None
zurückgibt).
Beispiel für „process“-Methode
Der Code im folgenden Beispiel zeigt eine Handler-Klasse StockSale
mit einer process
-Methode, die drei UDTF-Argumente (symbol
, quantity
und price
) verarbeitet und eine einzelne Zeile mit zwei Spalten (symbol
und total
) zurückgibt. Beachten Sie, dass die Parameter der process
-Methode in der gleichen Reihenfolge deklariert werden wie die stock_sale
-Parameter. Die Argumente in der yield
-Anweisung der process
-Methode weisen dieselbe Reihenfolge auf wie die Spalten, die in der stock_sale
RETURNS TABLE-Klausel deklariert sind.
create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
def process(self, symbol, quantity, price):
cost = quantity * price
yield (symbol, cost)
$$;
Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol
, quantity
und price
der Tabelle stocks_table
übergeben werden.
select stock_sale.symbol, total
from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Zurückgeben eines Wertes¶
Bei der Rückgabe von Ausgabezeilen können Sie entweder yield
oder return
(aber nicht beide) verwenden, um Tupel mit dem Tabellenwert zurückzugeben. Wenn die Methode den Wert None
zurückgibt oder ergibt, wird die Verarbeitung für die aktuelle Zeile angehalten.
Wenn Sie
yield
verwenden, führen Sie für jede Ausgabezeile eine separateyield
-Anweisung aus. Dies ist die beste Vorgehensweise, da die mityield
verbundene Auswertung im Lazy-Modus eine effizientere Verarbeitung ermöglicht und Timeouts vermeiden hilft.Jedes Element des Tupels wird zu einem Spaltenwert in dem von der UDTF zurückgegebenen Ergebnis. Die Reihenfolge der
yield
-Argumente muss mit der Reihenfolge der Spalten übereinstimmen, die in der RETURNS TABLE-Klausel der CREATE FUNCTION-Anweisung für den Rückgabewert deklariert sind.Der Code im folgenden Beispiel gibt Werte zurück, die zwei Zeilen darstellen.
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
Da das „yield“-Argument ein Tupel ist, müssen Sie bei Übergabe eines einzelnen Werts ein nachstehendes Komma einfügen, wie im folgenden Beispiel gezeigt:
yield (cost,)
Bei Verwendung von
return
wird ein iterierendes Objekt mit Tupeln zurückgegeben.Jeder Wert in einem Tupel wird zu einem Spaltenwert in dem von der UDTF zurückgegebenen Ergebnis. Die Reihenfolge der Spaltenwerte in einem Tupel muss mit der Reihenfolge der Spalten übereinstimmen, die in der RETURNS TABLE-Klausel der CREATE FUNCTION-Anweisung für den Rückgabewert deklariert sind.
Der Code im folgenden Beispiel gibt zwei Zeilen mit jeweils zwei Spalten, Symbol und Summe, zurück:
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
Überspringen von Zeilen¶
Um eine Eingabezeile zu überspringen und die nächste Zeile zu verarbeiten (z. B. wenn die Eingabezeilen validiert werden), kann die process
-Methode eine der folgenden Optionen zurückgeben:
Bei Verwendung von
return
kannNone
, eine Liste mitNone
oder eine leere Liste zurückgegeben werden, um die Zeile zu überspringen.Bei Verwendung von
yield
kannNone
zurückgegeben werden, um eine Zeile zu überspringen.Beachten Sie, dass bei mehreren Aufrufen von
yield
alle Aufrufe nach einem Aufruf, derNone
zurückgibt, von Snowflake ignoriert werden.
Der Code im folgenden Beispiel gibt nur die Zeilen zurück, für die number
eine positive ganze Zahl ist. Wenn number
nicht positiv ist, gibt die Methode None
zurück, um die aktuelle Zeile zu überspringen und mit der Verarbeitung der nächsten Zeile fortzufahren.
def process(self, number):
if number < 1:
yield None
else:
yield (number)
Zustandsabhängige und zustandslose Verarbeitung¶
Sie können bei der Implementierung des Handlers entscheiden, ob dieser Zeilen unter Beachtung von Partitionen verarbeitet oder einfach nur Zeile für Zeile verarbeitet.
Bei der partitionsspezifischen Verarbeitung enthält der Handler Code zur Verwaltung des partitionsbezogenen Zustands. Dazu gehört eine
__init__
-Methode, die zu Beginn der Partitionsverarbeitung ausgeführt wird, und eineend_partition
-Methode, die Snowflake nach der Verarbeitung der letzten Zeile der Partition aufruft. Weitere Informationen dazu finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).Bei der partitionsunabhängigen Verarbeitung wird der Handler zustandslos ausgeführt und ignoriert Partitionsgrenzen.
Damit der Handler auf diese Weise ausgeführt wird, dürfen Sie keine
__init__
- oderend_partition
-Methode einschließen.
Verarbeiten von Partitionen¶
Sie können Partitionen in der Eingabe mit Code verarbeiten, der pro Partition ausgeführt wird (z. B. um den Zustand zu verwalten), sowie mit Code, der für jede Zeile in der Partition ausgeführt wird.
Bemerkung
Weitere Informationen zum Angeben von Partitionen beim Aufrufen einer UDTF finden Sie unter Tabellenfunktionen und Partitionen.
Wenn eine Abfrage Partitionen enthält, werden Zeilen anhand eines bestimmten Werts, z. B. des Werts einer Spalte, aggregiert. Die aggregierten Zeilen, die Ihr Handler erhält, werden nach diesem Wert partitioniert. Ihr Code kann die Partitionen und deren Zeilen so verarbeiten, dass die Verarbeitung für jede Partition einen partitionsbezogenen Zustand umfasst.
Der Code im folgenden SQL-Beispiel fragt Informationen über den Verkauf von Aktien ab. Er führt einen UDTF namens stock_sale_sum
aus, deren Eingabe durch den Wert der Spalte symbol
partitioniert ist.
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Denken Sie daran, dass Ihr Code auch dann, wenn die eingehenden Zeilen partitioniert sind, die Partitionierung ignorieren und die Zeilen nacheinander, ohne Berücksichtigung der Partitionen verarbeiten kann. Sie können zum Beispiel Code zur Verarbeitung von partitionsbezogenen Zuständen weglassen, wie z. B. die Methoden __init__
und end_partition
der Handler-Klasse, und nur die Methode process
implementieren. Weitere Informationen dazu finden Sie unter Zustandsabhängige und zustandslose Verarbeitung (unter diesem Thema).
Um jede Partition als eine Einheit zu verarbeiten, gehen Sie wie folgt vor:
Implementieren Sie eine Methode der Handler-Klasse
__init__
, um die Verarbeitung für die Partition zu initialisieren.Weitere Informationen dazu finden Sie unter Initialisierung des Handlers (unter diesem Thema).
Fügen Sie bei der Verarbeitung jeder Zeile mit der
process
-Methode partitionsspezifischen Code ein.Weitere Informationen zur Verarbeitung von Zeilen finden Sie unter Verarbeiten von Zeilen (unter diesem Thema).
Implementieren Sie eine
end_partition
-Methode, um die Partitionsverarbeitung abzuschließen.Weitere Informationen dazu finden Sie unter Abschließen der Partitionsverarbeitung (unter diesem Thema).
Im Folgenden wird die Sequenz der Aufrufe Ihres Handlers beschrieben, wenn Sie Code zur Ausführung pro Partition eingefügt haben.
Bei Verarbeitungsstart einer Partition verwendet Snowflake noch vor Verarbeitung der ersten Zeile die Methode
__init__
Ihrer Handler-Klasse, um eine Instanz der Klasse zu erstellen.Hier kann ein partitionsbezogener Zustand eingerichtet werden. Sie können zum Beispiel eine Instanzenvariable initialisieren, die einen aus den Zeilen der Partition berechneten Wert aufnimmt.
Für jede Zeile der Partition ruft Snowflake die
process
-Methode auf.Jedes Mal, wenn die Methode ausgeführt wird, kann sie Änderungen an den Zustandswerten vornehmen. So könnte die
process
-Methode beispielsweise den Wert der Instanzenvariablen aktualisieren.Nachdem Ihr Code die letzte Zeile der Partition verarbeitet hat, ruft Snowflake Ihre
end_partition
-Methode auf.Mit dieser Methode können Sie Ausgabezeilen zurückgeben, die auf Partitionsebene einen Wert enthalten, den Sie zurückgeben möchten. Sie könnten zum Beispiel den Wert der Instanzenvariablen zurückgeben, die Sie bei der Verarbeitung der Zeilen in der Partition aktualisiert haben.
Ihre
end_partition
-Methode erhält von Snowflake keine Argumente. Snowflake ruft einfach die Methode auf, nachdem die letzte Zeile in der Partition verarbeitet wurde.
Abschließen der Partitionsverarbeitung¶
Sie können in Ihrer Handler-Klasse optional eine end_partition
-Methode implementieren, die von Snowflake aufgerufen wird, nachdem alle Zeilen einer Partition verarbeitet wurden. Mit dieser Methode können Sie Code für eine Partition ausführen, nachdem alle Zeilen dieser Partition verarbeitet wurden. Ihre end_partition
-Methode kann Ausgabezeilen erzeugen, um z. B. die Ergebnisse einer partitionsbezogenen Berechnung zurückzugeben. Weitere Informationen dazu finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).
Die Signatur der Methode muss die folgende Form haben:
def end_partition(self):
Snowflake erwartet folgendes von der Implementierung Ihrer end_partition
-Methode:
Die Methode darf nicht statisch sein.
Die Methode darf nur den Parameter
self
haben.Alternativ zur Rückgabe eines Tabellenwerts kann die Methode auch eine leere Liste oder
None
erzeugen.
Bemerkung
Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn end_partition
zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.
Beispiel für die Verarbeitung von Partitionen¶
Der Code im folgenden Beispiel berechnet die Gesamtkosten, die für Käufe einer Aktie gezahlt wurden, indem zunächst die Kosten pro Kauf berechnet und die Käufe addiert werden (mit der Methode process
). Der Code gibt die Summe in der Methode end_partition
zurück.
Ein Beispiel für eine UDTF, die diesen Handler enthält, und den Aufruf der UDTF finden Sie unter Beispiel für eine Handler-Klasse.
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
Bei der Verarbeitung von Partitionen ist Folgendes zu beachten:
Ihr Code kann Partitionen verarbeiten, die nicht explizit in einem Aufruf der UDTF angegeben sind. Selbst wenn ein Aufruf der UDTF keine PARTITION BY-Klausel enthält, partitioniert Snowflake die Daten implizit.
Ihre
process
-Methode empfängt die Zeilendaten in der Reihenfolge, die, falls vorhanden, in der ORDER BY-Klausel der Partition angegeben ist.
Beispiele¶
Verwenden eines importierten Pakets¶
Sie können Python-Pakete verwenden, die in einer kuratierten Liste von Drittanbieter-Paketen von Anaconda enthalten sind, die in Snowflake verfügbar sind. Um diese Pakete als Abhängigkeiten in der UDTF anzugeben, verwenden Sie in der CREATE FUNCTION-Anweisung die PACKAGES-Klausel.
Sie können die enthaltenen Pakete ermitteln, indem Sie in Snowflake den folgenden SQL-Befehl ausführen:
select * from information_schema.packages where language = 'python';
Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen und Erstellen von Python-UDFs.
Der Code im folgenden Beispiel verwendet eine Funktion des Pakets NumPy (Numerical Python), um den Durchschnittspreis pro Aktie aus einer Reihe von Aktienkäufen zu berechnen, die jeweils einen anderen Preis pro Aktie haben.
create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np
class StockSaleAverage:
def __init__(self):
self._price_array = []
self._quantity_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
self._price_array.append(float(price))
cost = quantity * price
yield (symbol, cost)
def end_partition(self):
np_array = np.array(self._price_array)
avg = np.average(np_array)
yield (self._symbol, avg)
$$;
Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol
, quantity
und price
der Tabelle stocks_table
übergeben werden. Weitere Informationen zum Aufrufen einer UDTF finden Sie unter Aufrufen einer UDF.
select stock_sale_average.symbol, total
from stocks_table,
table(stock_sale_average(symbol, quantity, price)
over (partition by symbol));
Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen¶
Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.
Bemerkung
Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.
Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.
Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel
der Bibliothek joblib
verwenden, wie in dem folgenden Beispiel.
CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
RETURNS TABLE (result INT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'JoblibMultiprocessing'
PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt
class JoblibMultiprocessing:
def process(self, i):
pass
def end_partition(self):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
for r in result:
yield (r, )
$$;
Bemerkung
Das für joblib.Parallel
verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.
Standardeinstellung für Standard-Warehouses:
threading
Standardeinstellung für Snowpark-optimierte Warehouses:
loky
(Multiprocessing)
Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend
aufrufen, wie im folgenden Beispiel.
import joblib
joblib.parallel_backend('loky')
Erstellen der UDTF mit CREATE FUNCTION
¶
Sie erstellen einen UDTF in SQL mit dem Befehl CREATE FUNCTION und geben den von Ihnen geschriebenen Code als Handler an. Weitere Informationen zu diesem Befehl finden Sie unter CREATE FUNCTION.
Verwenden Sie bei der Erstellung einer UDTF die folgende Syntax:
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
LANGUAGE PYTHON
[ IMPORTS = ( '<imports>' ) ]
RUNTIME_VERSION = 3.8
[ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
[ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
HANDLER = '<handler_class>'
[ AS '<python_code>' ]
Um den von Ihnen geschriebenen Handler-Code mit der UDTF zu verknüpfen, müssen Sie beim Ausführen der CREATE FUNCTION Folgendes tun:
Geben Sie unter RETURNS TABLE die Ausgabespalten als Paare aus Spaltenname und Spaltentyp an.
Setzen Sie LANGUAGE auf PYTHON.
Setzen Sie den Wert der IMPORTS-Klausel auf den Pfad und den Namen der Handler-Klasse, wenn sich die Klasse an einem externen Speicherort wie z. B. einem Stagingbereich befindet.
Weitere Informationen dazu finden Sie unter Erstellen von Python-UDFs.
Setzen Sie RUNTIME_VERSION auf die Version der Python-Laufzeitumgebung, die von Ihrem Code benötigt wird. Folgende Versionen von Python werden unterstützt:
3.8
3.9
3.10
3.11
Setzen Sie den Wert der PACKAGES-Klausel auf den Namen eines oder mehrerer Pakete, die, falls vorhanden, von der Handler-Klasse benötigt werden.
Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen und Erstellen von Python-UDFs.
Setzen Sie den Wert der HANDLER-Klausel auf den Namen der Handler-Klasse.
Wenn Sie den Python-Handler-Code mit einer UDTF verknüpfen, können Sie den Code entweder in die Zeile einfügen oder an einem Speicherort des Snowflake-Stagingbereichs darauf verweisen. Der HANDLER-Wert unterscheidet Groß- und Kleinschreibung und muss mit dem Namen der Python-Klasse übereinstimmen.
Weitere Informationen dazu finden Sie unter UDFs mit Inline-Code vs. UDFs mit aus einem Stagingbereich hochgeladenem Code.
Wichtig
Bei einer skalaren Python-UDF enthält der Wert der HANDLER-Klausel den Namen der Methode.
Bei einer Python-UDTF enthält der Wert der HANDLER-Klausel den Klassennamen, aber keinen Methodennamen.
Der Grund für den Unterschied liegt darin, dass bei einer skalaren Python-UDF der Name der Handler-Methode vom Benutzer gewählt wird und daher Snowflake nicht im Voraus bekannt ist, während bei einer Python-UDTF die Namen der Methoden (wie die
end_partition
-Methode) bekannt sind, da sie mit den von Snowflake angegebenen Namen übereinstimmen müssen.Die Klausel
AS '<Pythoncode>'
ist erforderlich, wenn der Handler-Code inline mit CREATE FUNCTION angegeben wird.