Vektorisierte Python-UDTFs

Dieses Thema bietet eine Einführung in vektorisierte Python-UDTFs.

Unter diesem Thema:

Übersicht

Vektorisierte Python-UDTFs (benutzerdefinierte Tabellenfunktionen) bieten eine Möglichkeit, batchweise über Zeilen zu arbeiten.

Snowflake unterstützt zwei Arten von vektorisierten UDTFs:

  • UDTFs mit einer vektorisierten end_partition-Methode

  • UDTFs mit einer vektorisierten process-Methode

Sie müssen sich für eine Art entscheiden, da eine UDTF nicht sowohl eine vektorisierte process-Methode als auch eine vektorisierte end_partition-Methode haben kann.

UDTFs mit vektorisierter „end_partition“-Methode

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

Verwenden Sie eine vektorisierte end_partition-Methode für die folgenden Aufgaben:

  • Verarbeiten von Daten Partition für Partition und nicht Zeile für Zeile.

  • Rückgabe mehrerer Zeilen oder Spalten für jede Partition.

  • Verwenden von Bibliotheken, die für die Datenanalyse Pandas-DataFrames verwenden.

UDTFs mit einer vektorisierten Prozessmethode

UDTFs mit einer vektorisierten process-Methode bieten eine Möglichkeit, über Zeilen in Batches zu operieren, wenn die Operation eine 1:-1-Zuordnung ausführt. Mit anderen Worten, die Methode gibt für jede Eingabezeile genaue eine Ausgabezeile zurück. Die Anzahl der Spalten ist nicht beschränkt.

Verwenden Sie eine vektorisierte process-Methode für die folgenden Aufgaben:

  • Batchweises Anwenden einer 1-zu-1-Transformation mit einem mehrspaltigen Ergebnis.

  • Verwenden einer Bibliothek, die pandas.DataFrame benötigt.

  • Verarbeiten von Zeilen in Batches ohne explizite Partitionierung.

  • Nutzen von to_pandas()-API, um das Abfrageergebnis direkt in einen Pandas-DataFrame umzuwandeln.

Voraussetzungen

Die Snowpark-Bibliothek für Python, Version 1.14.0 oder höher ist erforderlich.

Erstellen Sie eine UDTF mit einer vektorisierten end_partition-Methode

  1. Optional: Definieren Sie eine Handler-Klasse mit einer __init__-Methode, die vor der Verarbeitung jeder Partition aufgerufen wird.

    Hinweis: Definieren Sie keine process-Methode.

  2. Definieren Sie eine end_partition -Methode, die ein DataFrame-Argument entgegennimmt und ein pandas.DataFrame oder ein Tupel von pandas.Series oder pandas.arrays zurückgibt oder ergibt, wobei jedes Array eine Spalte ist.

    Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen.

  3. Um die end_partition-Methode als vektorisiert zu kennzeichnen, verwenden Sie das @vectorized-Decorator-Element oder das _sf_vectorized_input-Funktionsattribut.

    Weitere Informationen finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element @vectorized kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.

Bemerkung

Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter end_partition-Methode entsprechen der Signatur der SQL-Funktion. Die Spaltennamen entsprechen den Anforderungen für SQL-Bezeichner. Das heißt, wenn ein Bezeichner nicht in Anführungszeichen steht, wird er groß geschrieben, und wenn er in doppelten Anführungszeichen steht, bleibt er unverändert.

Der folgende Codeblock ist ein Beispiel für die Erstellung einer UDTF mit einer vektorisierten end_partition-Methode unter Verwendung des @vectorized-Decorator-Elements:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

Der folgende Codeblock ist ein Beispiel für die Erstellung einer UDTF mit einer vektorisierten end_partition-Methode unter Verwendung des Funktionsattributs:

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Bemerkung

Ein UDTF mit einer vektorisierten end_partition-Methode muss mit einer PARTITION BY-Klausel aufgerufen werden, um die Partitionen zu erstellen.

So rufen Sie die UDTF mit allen Daten in derselben Partition auf:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

Zum Aufruf von UDTF mit den nach Spalte x partitionierten Daten:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Beispiel: Zeilensammlung mit einer regulären UDTF im Vergleich zur Verwendung einer UDTF mit einer vektorisierten end_partition-Methode

Zeilensammlung mit einer regulären UDTF:

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Zeilensammlung mit einer UDTF mit einer vektorisierten end_partition-Methode:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Beispiel: Berechnen der zusammenfassenden Statistik für jede Spalte in der Partition

Das folgende Beispiel zeigt, wie die Berechnung der zusammenfassenden Statistik für jede Spalte in der Partition mit der Pandas-Methode describe() vorgenommen wird.

  1. Erstellen Sie eine Tabelle und erzeugen Sie drei Partitionen mit je fünf Zeilen:

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. Sehen Sie sich die Daten an:

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. Erstellen Sie die Funktion:

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. Führen Sie einen der folgenden Schritte aus:

    • Rufen Sie die Funktion auf, und führen Sie die Partitionierung über id aus:

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • Rufen Sie die Funktion auf und behandeln Sie die gesamte Tabelle als eine Partition:

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

Erstellen Sie eine UDTF mit einer vektorisierten Prozessmethode

  1. Definieren Sie eine Handler-Klasse, ähnlich wie bei regulären UDTFs, und verwenden Sie die optionalen Methoden __init__ und end_partition.

  2. Definieren Sie eine process-Methode, die ein DataFrame-Argument entgegennimmt und entweder einen pandas.DataFrame oder ein Tupel von pandas.Series oder pandas.arrays zurückgibt, wobei jedes Array eine Spalte ist.

    Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen. Das zurückgegebene Ergebnis muss genau ein DataFrame oder Tupel sein. Dies unterscheidet sich von einer vektorisierten end_partition-Methode, bei der Sie eine Liste übergeben oder zurückgeben können.

  3. Um die process-Methode als vektorisiert zu kennzeichnen, verwenden Sie das @vectorized-Decorator-Element oder das _sf_vectorized_input-Funktionsattribut.

    Weitere Informationen finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element @vectorized kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.

  4. Optional: Wenn Ihre Python-Handler-Funktion das Zeitlimit für die Ausführung überschreitet, können Sie unter eine Zielbatchgröße einstellen.

Bemerkung

Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter process-Methode entsprechen der Signatur der SQL-Funktion. Die Spaltennamen entsprechen den Anforderungen für SQL-Bezeichner. Wenn nämlich ein Bezeichner nicht in Anführungszeichen steht, wird er groß geschrieben, und wenn er in doppelten Anführungszeichen steht, bleibt er unverändert.

Der Handler für eine UDTF mit einer vektorisierten process-Methode kann so implementiert werden, dass er Batches auf eine partitionierte Art und Weise verarbeitet oder sie einfach Batch für Batch verarbeitet. Weitere Informationen dazu finden Sie unter Zustandsabhängige und zustandslose Verarbeitung.

Beispiel: Verwenden einer UDTF mit einer vektorisierten Prozessmethode, um eine Hot-Codierung anzuwenden

Verwenden Sie eine UDTF mit einer vektorisierten process-Methode, um eine Hot-Codierung auf eine Tabelle mit zehn Kategorien anzuwenden:

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

Beispielergebnis:

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

Bereiten Sie das Drucken der Tabelle vor:

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

Beispielergebnis:

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

Sie können das gleiche Ergebnis mit einem vektorisierten UDF erzielen, auch wenn dies weniger bequem ist. Sie müssen die Ergebnisse in eine einzige Spalte packen und dann die Spalte entpacken, um die Ergebnisse in einem brauchbaren pandas-DataFrame wiederherzustellen.

Beispiel für die Verwendung eines vektorisierten UDF:

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

Typunterstützung

Vektorisierte UDTFs unterstützen die gleichen SQL-Typen wie vektorisierte UDFs. Bei vektorisierten UDTFs werden SQL-NUMBER-Argumente mit einer Dezimalstellenzahl von 0, die alle in einen 64-Bit-Ganzzahl-Typ oder kleiner passen, immer Int16, Int32 oder Int64 zugeordnet. Anders als bei skalaren UDFs wird das Argument einer skalaren UDTF, das nicht nullwertfähig ist, nicht in int16, int32 oder int64 umgewandelt.

Eine umfassende Übersicht zur Zuordnung von SQL-Typen zu pandas-dtypes finden Sie in der Typunterstützungstabelle unter dem Thema zu vektorisierten Python-UDFs.

Best Practices

  • Wenn mit jeder Zeile ein skalarer Wert zurückgegeben werden muss, erstellen Sie eine Liste von sich wiederholenden Werten, anstatt das numpy-Array zu entpacken, um Tupel zu erstellen. Zum Beispiel für ein zweispaltiges Ergebnis, anstatt:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Verwenden Sie dies:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • Um die Leistung zu verbessern, entpacken Sie semistrukturierte Daten in Spalten.

    Wenn Sie z. B. eine Varianten-Spalte obj mit den Elementen x(int), y(float) und z(string) haben, können Sie stattdessen eine UDTF mit einer Signatur wie dieser definieren und sie mit vec_udtf(obj) aufrufen:

    create function vec_udtf(variant obj)
    
    Copy

    Definieren Sie die UDTF mit einer Signatur wie dieser und rufen Sie sie mit vec_udtf(obj:x, obj:y, obj:z) auf:

    create function vec_udtf(int, float, string)
    
    Copy
  • Standardmäßig kodiert Snowflake die Eingaben in Pandas-dtypes, die NULL-Werte unterstützen (z. B. Int64). Wenn Sie eine Bibliothek verwenden, die einen primitiven Typ erfordert (z. B. numpy), und Ihre Eingabe keine NULL-Werte enthält, sollten Sie die Spalte erst in einen primitiven Typ umwandeln, bevor Sie die Bibliothek verwenden. Beispiel:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Weitere Informationen dazu finden Sie unter Typunterstützung.

  • Um bei Verwendung von UDTFs mit einer vektorisierten end_partition-Methode die Leistung zu verbessern und Timeouts zu vermeiden, sollten Sie pandas.concat nicht zum Akkumulieren von Teilergebnissen verwenden. Geben Sie stattdessen das Teilergebnis zurück, sobald es zur Verfügung steht.

    Verwenden Sie also nicht:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Sondern:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy