Vektorisierte Python-UDTFs¶
Dieses Thema bietet eine Einführung in vektorisierte Python-UDTFs.
Unter diesem Thema:
Übersicht¶
Vektorisierte Python-UDTFs (benutzerdefinierte Tabellenfunktionen) bieten eine Möglichkeit, batchweise über Zeilen zu arbeiten.
Snowflake unterstützt zwei Arten von vektorisierten UDTFs:
UDTFs mit einer vektorisierten
end_partition
-MethodeUDTFs mit einer vektorisierten
process
-Methode
Sie müssen sich für eine Art entscheiden, da eine UDTF nicht sowohl eine vektorisierte process
-Methode als auch eine vektorisierte end_partition
-Methode haben kann.
UDTFs mit vektorisierter „end_partition“-Methode¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames
and returning results as
pandas DataFrames
or lists of pandas arrays
or pandas Series.
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
Verwenden Sie eine vektorisierte end_partition
-Methode für die folgenden Aufgaben:
Verarbeiten von Daten Partition für Partition und nicht Zeile für Zeile.
Rückgabe mehrerer Zeilen oder Spalten für jede Partition.
Verwenden von Bibliotheken, die für die Datenanalyse Pandas-DataFrames verwenden.
UDTFs mit einer vektorisierten Prozessmethode¶
UDTFs mit einer vektorisierten process
-Methode bieten eine Möglichkeit, über Zeilen in Batches zu operieren, wenn die Operation eine 1:-1-Zuordnung ausführt. Mit anderen Worten, die Methode gibt für jede Eingabezeile genaue eine Ausgabezeile zurück. Die Anzahl der Spalten ist nicht beschränkt.
Verwenden Sie eine vektorisierte process
-Methode für die folgenden Aufgaben:
Batchweises Anwenden einer 1-zu-1-Transformation mit einem mehrspaltigen Ergebnis.
Verwenden einer Bibliothek, die
pandas.DataFrame
benötigt.Verarbeiten von Zeilen in Batches ohne explizite Partitionierung.
Nutzen von to_pandas()-API, um das Abfrageergebnis direkt in einen Pandas-DataFrame umzuwandeln.
Voraussetzungen¶
Die Snowpark-Bibliothek für Python, Version 1.14.0 oder höher ist erforderlich.
Erstellen Sie eine UDTF mit einer vektorisierten end_partition-Methode¶
Optional: Definieren Sie eine Handler-Klasse mit einer
__init__
-Methode, die vor der Verarbeitung jeder Partition aufgerufen wird.Hinweis: Definieren Sie keine
process
-Methode.Definieren Sie eine
end_partition
-Methode, die ein DataFrame-Argument entgegennimmt und einpandas.DataFrame
oder ein Tupel vonpandas.Series
oderpandas.arrays
zurückgibt oder ergibt, wobei jedes Array eine Spalte ist.Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen.
Um die
end_partition
-Methode als vektorisiert zu kennzeichnen, verwenden Sie das@vectorized
-Decorator-Element oder das_sf_vectorized_input
-Funktionsattribut.Weitere Informationen finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element
@vectorized
kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.
Bemerkung
Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter end_partition
-Methode entsprechen der Signatur der SQL-Funktion. Die Spaltennamen entsprechen den Anforderungen für SQL-Bezeichner. Das heißt, wenn ein Bezeichner nicht in Anführungszeichen steht, wird er groß geschrieben, und wenn er in doppelten Anführungszeichen steht, bleibt er unverändert.
Der folgende Codeblock ist ein Beispiel für die Erstellung einer UDTF mit einer vektorisierten end_partition
-Methode unter Verwendung des @vectorized
-Decorator-Elements:
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Der folgende Codeblock ist ein Beispiel für die Erstellung einer UDTF mit einer vektorisierten end_partition
-Methode unter Verwendung des Funktionsattributs:
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Bemerkung
Ein UDTF mit einer vektorisierten end_partition
-Methode muss mit einer PARTITION BY-Klausel aufgerufen werden, um die Partitionen zu erstellen.
So rufen Sie die UDTF mit allen Daten in derselben Partition auf:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Zum Aufruf von UDTF mit den nach Spalte x partitionierten Daten:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Beispiel: Zeilensammlung mit einer regulären UDTF im Vergleich zur Verwendung einer UDTF mit einer vektorisierten end_partition-Methode¶
Zeilensammlung mit einer regulären UDTF:
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Zeilensammlung mit einer UDTF mit einer vektorisierten end_partition
-Methode:
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Beispiel: Berechnen der zusammenfassenden Statistik für jede Spalte in der Partition¶
Das folgende Beispiel zeigt, wie die Berechnung der zusammenfassenden Statistik für jede Spalte in der Partition mit der Pandas-Methode describe()
vorgenommen wird.
Erstellen Sie eine Tabelle und erzeugen Sie drei Partitionen mit je fünf Zeilen:
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float); -- generate 3 partitions of 5 rows each insert into test_values select 'x', uniform(1.5,1000.5,random(1))::float col1, uniform(1.5,1000.5,random(2))::float col2, uniform(1.5,1000.5,random(3))::float col3, uniform(1.5,1000.5,random(4))::float col4, uniform(1.5,1000.5,random(5))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'y', uniform(1.5,1000.5,random(10))::float col1, uniform(1.5,1000.5,random(20))::float col2, uniform(1.5,1000.5,random(30))::float col3, uniform(1.5,1000.5,random(40))::float col4, uniform(1.5,1000.5,random(50))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'z', uniform(1.5,1000.5,random(100))::float col1, uniform(1.5,1000.5,random(200))::float col2, uniform(1.5,1000.5,random(300))::float col3, uniform(1.5,1000.5,random(400))::float col4, uniform(1.5,1000.5,random(500))::float col5 from table(generator(rowcount => 5));
Sehen Sie sich die Daten an:
select * from test_values; ----------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" | ----------------------------------------------------- |x |8.0 |99.4 |714.6 |168.7 |397.2 | |x |106.4 |237.1 |971.7 |828.4 |988.2 | |x |741.3 |207.9 |32.6 |640.6 |63.2 | |x |541.3 |828.6 |844.9 |77.3 |403.1 | |x |4.3 |723.3 |924.3 |282.5 |158.1 | |y |976.1 |562.4 |968.7 |934.3 |977.3 | |y |390.0 |244.3 |952.6 |101.7 |24.9 | |y |599.7 |191.8 |90.2 |788.2 |761.2 | |y |589.5 |201.0 |863.4 |415.1 |696.1 | |y |46.7 |659.7 |571.1 |938.0 |513.7 | |z |313.9 |188.5 |964.6 |435.4 |519.6 | |z |328.3 |643.1 |766.4 |148.1 |596.4 | |z |929.0 |255.4 |915.9 |857.2 |425.5 | |z |612.8 |816.4 |220.2 |879.5 |331.4 | |z |487.1 |704.5 |471.5 |378.9 |481.2 | -----------------------------------------------------
Erstellen Sie die Funktion:
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float) returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float) language python RUNTIME_VERSION = 3.9 packages=('pandas') handler='handler' as $$ from _snowflake import vectorized import pandas class handler: @vectorized(input=pandas.DataFrame) def end_partition(self, df): # using describe function to get the summary statistics result = df.describe().transpose() # add a column at the beginning for column ids result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5']) return result $$;
Führen Sie einen der folgenden Schritte aus:
Rufen Sie die Funktion auf, und führen Sie die Partitionierung über
id
aus:-- partition by id select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by id)) order by id, column_name; -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 | |x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 | |x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 | |x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 | |x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 | |y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 | |y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 | |y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 | |y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 | |y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 | |z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 | |z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 | |z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 | |z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 | |z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 | --------------------------------------------------------------------------------------------------------------------------------------------------------------------
Rufen Sie die Funktion auf und behandeln Sie die gesamte Tabelle als eine Partition:
-- treat the whole table as one partition select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by 1)) order by id, column_name; --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 | |NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 | |NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 | |NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 | |NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Erstellen Sie eine UDTF mit einer vektorisierten Prozessmethode¶
Definieren Sie eine Handler-Klasse, ähnlich wie bei regulären UDTFs, und verwenden Sie die optionalen Methoden
__init__
undend_partition
.Definieren Sie eine
process
-Methode, die ein DataFrame-Argument entgegennimmt und entweder einenpandas.DataFrame
oder ein Tupel vonpandas.Series
oderpandas.arrays
zurückgibt, wobei jedes Array eine Spalte ist.Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen. Das zurückgegebene Ergebnis muss genau ein DataFrame oder Tupel sein. Dies unterscheidet sich von einer vektorisierten
end_partition
-Methode, bei der Sie eine Liste übergeben oder zurückgeben können.Um die
process
-Methode als vektorisiert zu kennzeichnen, verwenden Sie das@vectorized
-Decorator-Element oder das_sf_vectorized_input
-Funktionsattribut.Weitere Informationen finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element
@vectorized
kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.Optional: Wenn Ihre Python-Handler-Funktion das Zeitlimit für die Ausführung überschreitet, können Sie unter eine Zielbatchgröße einstellen.
Bemerkung
Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter process
-Methode entsprechen der Signatur der SQL-Funktion. Die Spaltennamen entsprechen den Anforderungen für SQL-Bezeichner. Wenn nämlich ein Bezeichner nicht in Anführungszeichen steht, wird er groß geschrieben, und wenn er in doppelten Anführungszeichen steht, bleibt er unverändert.
Der Handler für eine UDTF mit einer vektorisierten process
-Methode kann so implementiert werden, dass er Batches auf eine partitionierte Art und Weise verarbeitet oder sie einfach Batch für Batch verarbeitet. Weitere Informationen dazu finden Sie unter Zustandsabhängige und zustandslose Verarbeitung.
Beispiel: Verwenden einer UDTF mit einer vektorisierten Prozessmethode, um eine Hot-Codierung anzuwenden¶
Verwenden Sie eine UDTF mit einer vektorisierten process
-Methode, um eine Hot-Codierung auf eine Tabelle mit zehn Kategorien anzuwenden:
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
Beispielergebnis:
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
Bereiten Sie das Drucken der Tabelle vor:
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Beispielergebnis:
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
Sie können das gleiche Ergebnis mit einem vektorisierten UDF erzielen, auch wenn dies weniger bequem ist. Sie müssen die Ergebnisse in eine einzige Spalte packen und dann die Spalte entpacken, um die Ergebnisse in einem brauchbaren pandas-DataFrame wiederherzustellen.
Beispiel für die Verwendung eines vektorisierten UDF:
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
Typunterstützung¶
Vektorisierte UDTFs unterstützen die gleichen SQL-Typen wie vektorisierte UDFs. Bei vektorisierten UDTFs werden SQL-NUMBER
-Argumente mit einer Dezimalstellenzahl von 0, die alle in einen 64-Bit-Ganzzahl-Typ oder kleiner passen, immer Int16
, Int32
oder Int64
zugeordnet. Anders als bei skalaren UDFs wird das Argument einer skalaren UDTF, das nicht nullwertfähig ist, nicht in int16
, int32
oder int64
umgewandelt.
Eine umfassende Übersicht zur Zuordnung von SQL-Typen zu pandas-dtypes finden Sie in der Typunterstützungstabelle unter dem Thema zu vektorisierten Python-UDFs.
Best Practices¶
Wenn mit jeder Zeile ein skalarer Wert zurückgegeben werden muss, erstellen Sie eine Liste von sich wiederholenden Werten, anstatt das
numpy
-Array zu entpacken, um Tupel zu erstellen. Zum Beispiel für ein zweispaltiges Ergebnis, anstatt:return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Verwenden Sie dies:
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
Um die Leistung zu verbessern, entpacken Sie semistrukturierte Daten in Spalten.
Wenn Sie z. B. eine Varianten-Spalte
obj
mit den Elementenx(int)
,y(float)
undz(string)
haben, können Sie stattdessen eine UDTF mit einer Signatur wie dieser definieren und sie mitvec_udtf(obj)
aufrufen:create function vec_udtf(variant obj)
Definieren Sie die UDTF mit einer Signatur wie dieser und rufen Sie sie mit
vec_udtf(obj:x, obj:y, obj:z)
auf:create function vec_udtf(int, float, string)
Standardmäßig kodiert Snowflake die Eingaben in Pandas-dtypes, die NULL-Werte unterstützen (z. B. Int64). Wenn Sie eine Bibliothek verwenden, die einen primitiven Typ erfordert (z. B.
numpy
), und Ihre Eingabe keine NULL-Werte enthält, sollten Sie die Spalte erst in einen primitiven Typ umwandeln, bevor Sie die Bibliothek verwenden. Beispiel:input_df['y'] = input_df['y'].astype("int64")
Weitere Informationen dazu finden Sie unter Typunterstützung.
Um bei Verwendung von UDTFs mit einer vektorisierten
end_partition
-Methode die Leistung zu verbessern und Timeouts zu vermeiden, sollten Siepandas.concat
nicht zum Akkumulieren von Teilergebnissen verwenden. Geben Sie stattdessen das Teilergebnis zurück, sobald es zur Verfügung steht.Verwenden Sie also nicht:
results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Sondern:
while(...): partial_result = pd.DataFrame(...) yield partial_result