Schreiben von UDTF in Python

Unter diesem Thema:

Sie können eine benutzerdefinierte Tabellenfunktion (UDTF) als Handler in Python implementieren. Der Handler-Code wird ausgeführt, wenn die UDTF aufgerufen wird. Unter diesem Thema wird beschrieben, wie ein Handler in Python implementiert und die UDTF erstellt wird.

Eine UDTF ist eine benutzerdefinierte Funktion (UDF), die tabellarische Ergebnisse liefert. Weitere Informationen zu in Python implementierten UDF-Handlern finden Sie unter Erstellen von Python-UDFs. Allgemeine Informationen zu UDFs finden Sie unter Übersicht zu benutzerdefinierten Funktionen.

In einem Handler für eine UDTF können Sie Eingabezeilen verarbeiten (siehe Verarbeiten von Zeilen unter diesem Thema). Sie können auch Verarbeitungslogik verwenden, die für jede Eingabepartition ausgeführt wird (siehe Verarbeiten von Partitionen unter diesem Thema).

Wenn Sie eine Python-UDTF erstellen, gehen Sie wie folgt vor:

  1. Implementieren Sie eine Klasse mit Methoden, die Snowflake aufruft, wenn die UDTF aufgerufen wird.

    Weitere Details dazu finden Sie unter Implementieren eines Handlers (unter diesem Thema).

  2. Erstellen Sie die UDTF mit dem SQL-Befehl CREATE FUNCTION, und geben Sie Ihre Klasse als Handler an. Wenn Sie die UDTF erstellen, geben Sie Folgendes an:

    • Datentypen der UDTF-Eingabeparameter

    • Datentypen der von der UDTF zurückgegebenen Spalten

    • Code, der als Handler ausgeführt wird, wenn die UDTF aufgerufen wird

    • Sprache, in der der Handler implementiert ist

    Weitere Informationen zur Syntax finden Sie unter Erstellen der UDTF mit CREATE FUNCTION (unter diesem Thema).

Sie können eine UDF oder UDTF aufrufen, wie unter Aufrufen einer UDF beschrieben.

Bemerkung

Tabellarische Funktionen (UDTFs) haben einen Grenzwert von 500 Eingabeargumenten und 500 Ausgabespalten.

Snowflake unterstützt derzeit das Schreiben von UDTFs mit den folgenden Python-Versionen:

  • 3.8

  • 3.9

  • 3.10

  • 3.11

Setzen Sie in Ihrer CREATE FUNCTION-Anweisung runtime_version auf die gewünschte Version.

Implementieren eines Handlers

Sie implementieren eine Handler-Klasse, um UDTF-Argumentwerte zu tabellarischen Ergebnissen zu verarbeiten und partitionierte Eingaben zu behandeln. Ein Beispiel für eine Handler-Klasse finden Sie unter Beispiel für eine Handler-Klasse (unter diesem Thema).

Wenn Sie die UDTF mit CREATE FUNCTION erstellen, geben Sie diese Klasse als Handler für die UDTF an. Weitere Informationen zu zur Erstellung der Funktion mit SQL finden Sie unter Erstellen der UDTF mit CREATE FUNCTION (in diesem Thema).

Eine Handler-Klasse implementiert Methoden, die Snowflake aufruft, wenn die UDTF aufgerufen wird. Diese Klasse enthält die Verarbeitungslogik der UDTF.

Methode

Anforderung

Beschreibung

__init__

Optional

Initialisiert den Zustand für die zustandsabhängige Verarbeitung von Eingabepartitionen. Weitere Informationen dazu finden Sie unter Initialisierung des Handlers (unter diesem Thema).

process

Erforderlich

Verarbeitet jede Eingabezeile und gibt einen tabellarischen Wert als Tupel zurück. Snowflake ruft diese Methode auf und übergibt Eingaben aus den Argumenten der UDTF. Weitere Informationen dazu finden Sie unter Definieren einer process-Methode (unter diesem Thema).

end_partition

Optional

Schließt die Verarbeitung der Eingabepartitionen ab und gibt einen tabellarischen Wert in Form von Tupeln zurück. Weitere Informationen dazu finden Sie unter Abschließen der Partitionsverarbeitung (unter diesem Thema).

Beachten Sie, dass das Auslösen einer Ausnahme durch eine beliebige Methode der Handler-Klasse zum Abbruch der Verarbeitung führt. Die Abfrage, mit der die UDTF aufgerufen wurde, schlägt mit einer Fehlermeldung fehl.

Bemerkung

Wenn Ihr Code die hier beschriebenen Anforderungen nicht erfüllt, kann die Erstellung oder Ausführung der UDTF fehlschlagen. Snowflake erkennt mögliche Verstöße beim Ausführen der CREATE FUNCTION-Anweisung.

Beispiel für eine Handler-Klasse

Der Code im folgenden Beispiel erstellt einen UDTF, deren Handler-Klasse Zeilen in einer Partition verarbeitet. Die Methode process verarbeitet jede Eingabezeile und gibt eine Zeile mit den Gesamtkosten eines Aktienverkaufs zurück. Nach der Verarbeitung der Zeilen in der Partition gibt der Prozess (mit ihrer Methode end_partition) die Gesamtsumme aller in der Partition enthaltenen Verkäufe zurück.

create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
    def __init__(self):
        self._cost_total = 0
        self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      cost = quantity * price
      self._cost_total += cost
      yield (symbol, cost)

    def end_partition(self):
      yield (self._symbol, self._cost_total)
$$;
Copy

Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol, quantity und price der Tabelle stocks_table übergeben werden. Weitere Informationen zum Aufrufen einer UDTF finden Sie unter Aufrufen einer UDF.

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

Initialisierung des Handlers

Sie können in Ihrer Handler-Klasse optional eine __init__-Methode implementieren, die Snowflake aufruft, bevor der Handler mit der Verarbeitung von Zeilen begonnen hat. Sie können diese Methode zum Beispiel verwenden, um einen partitionsbezogenen Zustand für den Handler einzurichten. Ihre __init__-Methode erzeugt möglicherweise keine Ausgabezeilen.

Die Signatur der Methode muss die folgende Form haben:

def __init__(self):
Copy

Sie könnte beispielweise wie folgt vorgehen:

  • Initialisieren des Zustands einer Partition, dann Verwendung dieses Zustands in den Methoden process und end_partition.

  • Ausführen einer langwierigen Initialisierung, die nur einmal pro Partition und nicht einmal pro Zeile erfolgen muss.

Bemerkung

Sie können auch die Verarbeitungslogik einmal vor Beginn der Partitionsverarbeitung ausführen, indem Sie diesen Code außerhalb der Handler-Klasse einfügen, beispielsweise vor der Klassendeklaration.

Weitere Informationen zur Verarbeitung von Partitionen finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).

Bei Verwendung einer __init__-Methode müssen Sie daran denken, dass für __init__ Folgendes gilt:

  • Kann nur self als Argument annehmen.

  • Kann keine Ausgabezeilen erzeugen. Verwenden Sie dafür Ihre process-Methodenimplementierung.

  • Wird für jede Partition einmal aufgerufen, und zwar bevor die process-Methode aufgerufen wird.

Verarbeiten von Zeilen

Implementieren Sie eine process-Methode, die Snowflake für jede Eingabezeile aufruft.

Definieren einer process-Methode

Definieren Sie eine process-Methode, die als Werte die aus SQL-Typen konvertierten UDTF-Argumente erhält und Daten zurückgibt, die Snowflake zum Erstellen des tabellarischen Rückgabewerts der UDTF verwendet.

Die Signatur der Methode muss die folgende Form haben:

def process(self, *args):
Copy

Für Ihre process-Methode muss Folgendes gelten:

  • Hat einen self-Parameter.

  • Deklariert Methodenparameter, die den UDTF-Parametern entsprechen.

    Die Namen der Methodenparameter müssen nicht mit den Namen der UDTF-Parameter übereinstimmen, aber die Methodenparameter müssen in der gleichen Reihenfolge deklariert werden, wie die UDTF-Parameter deklariert sind.

    Wenn Sie UDTF-Argumentwerte an Ihre Methode übergeben, konvertiert Snowflake die SQL-Typen der Werte in die Python-Typen, die Sie in der Methode verwenden. Weitere Informationen zur Zuordnung von SQL-Datentypen zu Python-Datentypen in Snowflake finden Sie unter Zuordnung von Datentypen zwischen SQL und Python.

  • Gibt ein oder mehrere Tupel aus (oder ein iterierendes Objekt mit Tupeln), wobei die Reihenfolge der Tupel der Reihenfolge der UDTF-Spalten für die Rückgabewerte entspricht.

    Die Tupel-Elemente müssen in der gleichen Reihenfolge erscheinen, wie die UDTF-Spalten für die Rückgabewerte deklariert sind. Weitere Informationen dazu finden Sie unter Zurückgeben eines Wertes (unter diesem Thema).

    Snowflake konvertiert dann die Python-Datentypen der Werte in die SQL-Datentypen, die von der UDTF-Deklaration benötigt werden. Weitere Informationen zur Zuordnung von SQL-Datentypen zu Python-Datentypen in Snowflake finden Sie unter Zuordnung von Datentypen zwischen SQL und Python.

Wenn eine Methode in der Handler-Klasse eine Ausnahme auslöst, wird die Verarbeitung angehalten. Die Abfrage, mit der die UDTF aufgerufen wurde, schlägt fehl und gibt eine Fehlermeldung aus. Wenn die process-Methode den Wert None zurückgibt, wird die Verarbeitung angehalten. (Die end_partition-Methode wird auch dann aufgerufen, wenn die process-Methode den Wert None zurückgibt).

Beispiel für „process“-Methode

Der Code im folgenden Beispiel zeigt eine Handler-Klasse StockSale mit einer process-Methode, die drei UDTF-Argumente (symbol, quantity und price) verarbeitet und eine einzelne Zeile mit zwei Spalten (symbol und total) zurückgibt. Beachten Sie, dass die Parameter der process-Methode in der gleichen Reihenfolge deklariert werden wie die stock_sale-Parameter. Die Argumente in der yield-Anweisung der process-Methode weisen dieselbe Reihenfolge auf wie die Spalten, die in der stock_sale RETURNS TABLE-Klausel deklariert sind.

create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
$$;
Copy

Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol, quantity und price der Tabelle stocks_table übergeben werden.

select stock_sale.symbol, total
  from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Copy

Zurückgeben eines Wertes

Bei der Rückgabe von Ausgabezeilen können Sie entweder yield oder return (aber nicht beide) verwenden, um Tupel mit dem Tabellenwert zurückzugeben. Wenn die Methode den Wert None zurückgibt oder ergibt, wird die Verarbeitung für die aktuelle Zeile angehalten.

  • Wenn Sie yield verwenden, führen Sie für jede Ausgabezeile eine separate yield-Anweisung aus. Dies ist die beste Vorgehensweise, da die mit yield verbundene Auswertung im Lazy-Modus eine effizientere Verarbeitung ermöglicht und Timeouts vermeiden hilft.

    Jedes Element des Tupels wird zu einem Spaltenwert in dem von der UDTF zurückgegebenen Ergebnis. Die Reihenfolge der yield-Argumente muss mit der Reihenfolge der Spalten übereinstimmen, die in der RETURNS TABLE-Klausel der CREATE FUNCTION-Anweisung für den Rückgabewert deklariert sind.

    Der Code im folgenden Beispiel gibt Werte zurück, die zwei Zeilen darstellen.

    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
      yield (symbol, cost)
    
    Copy

    Da das „yield“-Argument ein Tupel ist, müssen Sie bei Übergabe eines einzelnen Werts ein nachstehendes Komma einfügen, wie im folgenden Beispiel gezeigt:

    yield (cost,)
    
    Copy
  • Bei Verwendung von return wird ein iterierendes Objekt mit Tupeln zurückgegeben.

    Jeder Wert in einem Tupel wird zu einem Spaltenwert in dem von der UDTF zurückgegebenen Ergebnis. Die Reihenfolge der Spaltenwerte in einem Tupel muss mit der Reihenfolge der Spalten übereinstimmen, die in der RETURNS TABLE-Klausel der CREATE FUNCTION-Anweisung für den Rückgabewert deklariert sind.

    Der Code im folgenden Beispiel gibt zwei Zeilen mit jeweils zwei Spalten, Symbol und Summe, zurück:

    def process(self, symbol, quantity, price):
      cost = quantity * price
      return [(symbol, cost), (symbol, cost)]
    
    Copy

Überspringen von Zeilen

Um eine Eingabezeile zu überspringen und die nächste Zeile zu verarbeiten (z. B. wenn die Eingabezeilen validiert werden), kann die process-Methode eine der folgenden Optionen zurückgeben:

  • Bei Verwendung von return kann None, eine Liste mit None oder eine leere Liste zurückgegeben werden, um die Zeile zu überspringen.

  • Bei Verwendung von yield kann None zurückgegeben werden, um eine Zeile zu überspringen.

    Beachten Sie, dass bei mehreren Aufrufen von yield alle Aufrufe nach einem Aufruf, der None zurückgibt, von Snowflake ignoriert werden.

Der Code im folgenden Beispiel gibt nur die Zeilen zurück, für die number eine positive ganze Zahl ist. Wenn number nicht positiv ist, gibt die Methode None zurück, um die aktuelle Zeile zu überspringen und mit der Verarbeitung der nächsten Zeile fortzufahren.

def process(self, number):
  if number < 1:
    yield None
  else:
    yield (number)
Copy

Zustandsabhängige und zustandslose Verarbeitung

Sie können bei der Implementierung des Handlers entscheiden, ob dieser Zeilen unter Beachtung von Partitionen verarbeitet oder einfach nur Zeile für Zeile verarbeitet.

  • Bei der partitionsspezifischen Verarbeitung enthält der Handler Code zur Verwaltung des partitionsbezogenen Zustands. Dazu gehört eine __init__-Methode, die zu Beginn der Partitionsverarbeitung ausgeführt wird, und eine end_partition-Methode, die Snowflake nach der Verarbeitung der letzten Zeile der Partition aufruft. Weitere Informationen dazu finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).

  • Bei der partitionsunabhängigen Verarbeitung wird der Handler zustandslos ausgeführt und ignoriert Partitionsgrenzen.

    Damit der Handler auf diese Weise ausgeführt wird, dürfen Sie keine __init__- oder end_partition-Methode einschließen.

Verarbeiten von Partitionen

Sie können Partitionen in der Eingabe mit Code verarbeiten, der pro Partition ausgeführt wird (z. B. um den Zustand zu verwalten), sowie mit Code, der für jede Zeile in der Partition ausgeführt wird.

Bemerkung

Weitere Informationen zum Angeben von Partitionen beim Aufrufen einer UDTF finden Sie unter Tabellenfunktionen und Partitionen.

Wenn eine Abfrage Partitionen enthält, werden Zeilen anhand eines bestimmten Werts, z. B. des Werts einer Spalte, aggregiert. Die aggregierten Zeilen, die Ihr Handler erhält, werden nach diesem Wert partitioniert. Ihr Code kann die Partitionen und deren Zeilen so verarbeiten, dass die Verarbeitung für jede Partition einen partitionsbezogenen Zustand umfasst.

Der Code im folgenden SQL-Beispiel fragt Informationen über den Verkauf von Aktien ab. Er führt einen UDTF namens stock_sale_sum aus, deren Eingabe durch den Wert der Spalte symbol partitioniert ist.

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

Denken Sie daran, dass Ihr Code auch dann, wenn die eingehenden Zeilen partitioniert sind, die Partitionierung ignorieren und die Zeilen nacheinander, ohne Berücksichtigung der Partitionen verarbeiten kann. Sie können zum Beispiel Code zur Verarbeitung von partitionsbezogenen Zuständen weglassen, wie z. B. die Methoden __init__ und end_partition der Handler-Klasse, und nur die Methode process implementieren. Weitere Informationen dazu finden Sie unter Zustandsabhängige und zustandslose Verarbeitung (unter diesem Thema).

Um jede Partition als eine Einheit zu verarbeiten, gehen Sie wie folgt vor:

  • Implementieren Sie eine Methode der Handler-Klasse __init__, um die Verarbeitung für die Partition zu initialisieren.

    Weitere Informationen dazu finden Sie unter Initialisierung des Handlers (unter diesem Thema).

  • Fügen Sie bei der Verarbeitung jeder Zeile mit der process-Methode partitionsspezifischen Code ein.

    Weitere Informationen zur Verarbeitung von Zeilen finden Sie unter Verarbeiten von Zeilen (unter diesem Thema).

  • Implementieren Sie eine end_partition-Methode, um die Partitionsverarbeitung abzuschließen.

    Weitere Informationen dazu finden Sie unter Abschließen der Partitionsverarbeitung (unter diesem Thema).

Im Folgenden wird die Sequenz der Aufrufe Ihres Handlers beschrieben, wenn Sie Code zur Ausführung pro Partition eingefügt haben.

  1. Bei Verarbeitungsstart einer Partition verwendet Snowflake noch vor Verarbeitung der ersten Zeile die Methode __init__ Ihrer Handler-Klasse, um eine Instanz der Klasse zu erstellen.

    Hier kann ein partitionsbezogener Zustand eingerichtet werden. Sie können zum Beispiel eine Instanzenvariable initialisieren, die einen aus den Zeilen der Partition berechneten Wert aufnimmt.

  2. Für jede Zeile der Partition ruft Snowflake die process-Methode auf.

    Jedes Mal, wenn die Methode ausgeführt wird, kann sie Änderungen an den Zustandswerten vornehmen. So könnte die process-Methode beispielsweise den Wert der Instanzenvariablen aktualisieren.

  3. Nachdem Ihr Code die letzte Zeile der Partition verarbeitet hat, ruft Snowflake Ihre end_partition-Methode auf.

    Mit dieser Methode können Sie Ausgabezeilen zurückgeben, die auf Partitionsebene einen Wert enthalten, den Sie zurückgeben möchten. Sie könnten zum Beispiel den Wert der Instanzenvariablen zurückgeben, die Sie bei der Verarbeitung der Zeilen in der Partition aktualisiert haben.

    Ihre end_partition-Methode erhält von Snowflake keine Argumente. Snowflake ruft einfach die Methode auf, nachdem die letzte Zeile in der Partition verarbeitet wurde.

Abschließen der Partitionsverarbeitung

Sie können in Ihrer Handler-Klasse optional eine end_partition-Methode implementieren, die von Snowflake aufgerufen wird, nachdem alle Zeilen einer Partition verarbeitet wurden. Mit dieser Methode können Sie Code für eine Partition ausführen, nachdem alle Zeilen dieser Partition verarbeitet wurden. Ihre end_partition-Methode kann Ausgabezeilen erzeugen, um z. B. die Ergebnisse einer partitionsbezogenen Berechnung zurückzugeben. Weitere Informationen dazu finden Sie unter Verarbeiten von Partitionen (unter diesem Thema).

Die Signatur der Methode muss die folgende Form haben:

def end_partition(self):
Copy

Snowflake erwartet folgendes von der Implementierung Ihrer end_partition-Methode:

  • Die Methode darf nicht statisch sein.

  • Die Methode darf nur den Parameter self haben.

  • Alternativ zur Rückgabe eines Tabellenwerts kann die Methode auch eine leere Liste oder None erzeugen.

Bemerkung

Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn end_partition zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.

Beispiel für die Verarbeitung von Partitionen

Der Code im folgenden Beispiel berechnet die Gesamtkosten, die für Käufe einer Aktie gezahlt wurden, indem zunächst die Kosten pro Kauf berechnet und die Käufe addiert werden (mit der Methode process). Der Code gibt die Summe in der Methode end_partition zurück.

Ein Beispiel für eine UDTF, die diesen Handler enthält, und den Aufruf der UDTF finden Sie unter Beispiel für eine Handler-Klasse.

class StockSaleSum:
  def __init__(self):
    self._cost_total = 0
    self._symbol = ""

  def process(self, symbol, quantity, price):
    self._symbol = symbol
    cost = quantity * price
    self._cost_total += cost
    yield (symbol, cost)

  def end_partition(self):
    yield (self._symbol, self._cost_total)
Copy

Bei der Verarbeitung von Partitionen ist Folgendes zu beachten:

  • Ihr Code kann Partitionen verarbeiten, die nicht explizit in einem Aufruf der UDTF angegeben sind. Selbst wenn ein Aufruf der UDTF keine PARTITION BY-Klausel enthält, partitioniert Snowflake die Daten implizit.

  • Ihre process-Methode empfängt die Zeilendaten in der Reihenfolge, die, falls vorhanden, in der ORDER BY-Klausel der Partition angegeben ist.

Beispiele

Verwenden eines importierten Pakets

Sie können Python-Pakete verwenden, die in einer kuratierten Liste von Drittanbieter-Paketen von Anaconda enthalten sind, die in Snowflake verfügbar sind. Um diese Pakete als Abhängigkeiten in der UDTF anzugeben, verwenden Sie in der CREATE FUNCTION-Anweisung die PACKAGES-Klausel.

Sie können die enthaltenen Pakete ermitteln, indem Sie in Snowflake den folgenden SQL-Befehl ausführen:

select * from information_schema.packages where language = 'python';
Copy

Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen und Erstellen von Python-UDFs.

Der Code im folgenden Beispiel verwendet eine Funktion des Pakets NumPy (Numerical Python), um den Durchschnittspreis pro Aktie aus einer Reihe von Aktienkäufen zu berechnen, die jeweils einen anderen Preis pro Aktie haben.

create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np

class StockSaleAverage:
    def __init__(self):
      self._price_array = []
      self._quantity_total = 0
      self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      self._price_array.append(float(price))
      cost = quantity * price
      yield (symbol, cost)

    def end_partition(self):
      np_array = np.array(self._price_array)
      avg = np.average(np_array)
      yield (self._symbol, avg)
$$;
Copy

Im folgenden Beispiel wird die vorherige UDF aufgerufen, wobei Werte aus den Spalten symbol, quantity und price der Tabelle stocks_table übergeben werden. Weitere Informationen zum Aufrufen einer UDTF finden Sie unter Aufrufen einer UDF.

select stock_sale_average.symbol, total
  from stocks_table,
  table(stock_sale_average(symbol, quantity, price)
    over (partition by symbol));
Copy

Ausführen von gleichzeitigen Aufgaben mit Worker-Prozessen

Sie können gleichzeitige Aufgaben mithilfe von Python-Worker-Prozessen ausführen. Dies kann nützlich sein, wenn Sie parallele Aufgaben ausführen müssen, die die Vorteile mehrerer CPU-Kerne auf den Warehouse-Knoten ausnutzen.

Bemerkung

Snowflake empfiehlt, das integrierte Python-Multiprocessing-Modul nicht zu verwenden.

Um Fälle zu umgehen, in denen die Python Global Interpreter Lock verhindert, dass ein Multitasking-Ansatz über alle CPU-Kerne skaliert, können Sie gleichzeitige Aufgaben über separate Worker-Prozesse statt über Threads ausführen.

Sie können dies bei Snowflake-Warehouses tun, indem Sie die Klasse Parallel der Bibliothek joblib verwenden, wie in dem folgenden Beispiel.

CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
  RETURNS TABLE (result INT)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'JoblibMultiprocessing'
  PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt

class JoblibMultiprocessing:
  def process(self, i):
    pass

  def end_partition(self):
    result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
    for r in result:
      yield (r, )
$$;
Copy

Bemerkung

Das für joblib.Parallel verwendete Standard-Backend unterscheidet zwischen Standard- und Snowpark-optimierten Snowflake-Warehouses.

  • Standardeinstellung für Standard-Warehouses: threading

  • Standardeinstellung für Snowpark-optimierte Warehouses: loky (Multiprocessing)

Sie können die Standardeinstellung des Backends außer Kraft setzen, indem Sie die Funktion joblib.parallel_backend aufrufen, wie im folgenden Beispiel.

import joblib
joblib.parallel_backend('loky')
Copy

Erstellen der UDTF mit CREATE FUNCTION

Sie erstellen einen UDTF in SQL mit dem Befehl CREATE FUNCTION und geben den von Ihnen geschriebenen Code als Handler an. Weitere Informationen zu diesem Befehl finden Sie unter CREATE FUNCTION.

Verwenden Sie bei der Erstellung einer UDTF die folgende Syntax:

CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
  RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
  LANGUAGE PYTHON
  [ IMPORTS = ( '<imports>' ) ]
  RUNTIME_VERSION = 3.8
  [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
  [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
  HANDLER = '<handler_class>'
  [ AS '<python_code>' ]
Copy

Um den von Ihnen geschriebenen Handler-Code mit der UDTF zu verknüpfen, müssen Sie beim Ausführen der CREATE FUNCTION Folgendes tun:

  • Geben Sie unter RETURNS TABLE die Ausgabespalten als Paare aus Spaltenname und Spaltentyp an.

  • Setzen Sie LANGUAGE auf PYTHON.

  • Setzen Sie den Wert der IMPORTS-Klausel auf den Pfad und den Namen der Handler-Klasse, wenn sich die Klasse an einem externen Speicherort wie z. B. einem Stagingbereich befindet.

    Weitere Informationen dazu finden Sie unter Erstellen von Python-UDFs.

  • Setzen Sie RUNTIME_VERSION auf die Version der Python-Laufzeitumgebung, die von Ihrem Code benötigt wird. Folgende Versionen von Python werden unterstützt:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

  • Setzen Sie den Wert der PACKAGES-Klausel auf den Namen eines oder mehrerer Pakete, die, falls vorhanden, von der Handler-Klasse benötigt werden.

    Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen und Erstellen von Python-UDFs.

  • Setzen Sie den Wert der HANDLER-Klausel auf den Namen der Handler-Klasse.

    Wenn Sie den Python-Handler-Code mit einer UDTF verknüpfen, können Sie den Code entweder in die Zeile einfügen oder an einem Speicherort des Snowflake-Stagingbereichs darauf verweisen. Der HANDLER-Wert unterscheidet Groß- und Kleinschreibung und muss mit dem Namen der Python-Klasse übereinstimmen.

    Weitere Informationen dazu finden Sie unter UDFs mit Inline-Code vs. UDFs mit aus einem Stagingbereich hochgeladenem Code.

    Wichtig

    Bei einer skalaren Python-UDF enthält der Wert der HANDLER-Klausel den Namen der Methode.

    Bei einer Python-UDTF enthält der Wert der HANDLER-Klausel den Klassennamen, aber keinen Methodennamen.

    Der Grund für den Unterschied liegt darin, dass bei einer skalaren Python-UDF der Name der Handler-Methode vom Benutzer gewählt wird und daher Snowflake nicht im Voraus bekannt ist, während bei einer Python-UDTF die Namen der Methoden (wie die end_partition-Methode) bekannt sind, da sie mit den von Snowflake angegebenen Namen übereinstimmen müssen.

  • Die Klausel AS '<Pythoncode>' ist erforderlich, wenn der Handler-Code inline mit CREATE FUNCTION angegeben wird.