Vektorisierte Python-UDTFs¶
Dieses Thema bietet eine Einführung in vektorisierte Python-UDTFs.
Unter diesem Thema:
Übersicht¶
Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.
Verwenden Sie solche Funktionen in folgenden Fällen:
Sie müssen Ihre Daten Partition für Partition und nicht Zeile für Zeile verarbeiten.
Sie müssen mehrere Zeilen oder Spalten für jede Partition zurückgeben.
Sie möchten Bibliotheken verwenden, die für die Datenanalyse Pandas-DataFrames verwenden.
Voraussetzungen¶
Die Snowpark-Bibliothek für Python Version 1.6.1 oder höher ist erforderlich.
Erste Schritte¶
So erstellen Sie eine UDTF mit einer vektorisierten „end_partition“:
Optional können Sie eine Handler-Klasse mit einer
__init__
-Methode definieren, die vor jedem Verarbeiten einer Partition aufgerufen wird.Definieren Sie keine
process
-Methode.Definieren Sie eine
end_partition
-Methode, die ein DataFrame-Argument entgegennimmt und einpandas.DataFrame
oder ein Tupel vonpandas.Series
oderpandas.arrays
zurückgibt oder ergibt, wobei jedes Array eine Spalte ist. Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen.Markieren Sie die Methode
end_partition
als vektorisiert, indem Sie das Decorator-Element@vectorized
oder das Funktionsattribut_sf_vectorized_input
verwenden. Weitere Informationen dazu finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element@vectorized
kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.
Bemerkung
Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter „end_partition“ entsprechen der Signatur der SQL-Funktion. Die Spaltennamen werden den Anforderungen für SQL-Bezeichner entsprechen. Das bedeutet, wenn ein Bezeichner nicht in Anführungszeichen steht, wird er in Großbuchstaben geschrieben, und wenn er in doppelten Anführungszeichen steht, wird er so beibehalten, wie er ist.
Im folgenden Beispiel wird eine UDTF mit vektorisierter „end_partition“ unter Verwendung des Decorator-Elements @vectorized
erstellt.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Im folgenden Beispiel wird eine UDTF mit vektorisierter „end_partition“ unter Verwendung des Funktionsattributs erstellt.
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Bemerkung
Eine UDTF mit vektorisierter „end_partition“ muss mit der PARTITION BY-Klausel aufgerufen werden, damit die Partitionen erstellt werden.
So rufen Sie die UDTF mit allen Daten in derselben Partition auf:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
So rufen Sie die UDTF mit den durch Spalte x partitionierten Daten auf:
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Typunterstützung¶
UDTFs mit vektorisierter „end_partition“ unterstützen bei Argumenten und Rückgabewerten die gleichen SQL-Typen wie vektorisierte Python-UDFs. Bei UDTFs mit vektorisierter „end_partition“ werden SQL-NUMBER
-Argumente mit einer Dezimalstellenzahl von 0, die alle in einen 64-Bit-Ganzzahl-Typ oder kleiner passen, immer Int16
, Int32
oder Int64
zugeordnet. Das heißt, dass anders als bei skalaren UDFs das Argument einer skalaren UDTF, das nicht nullwertfähig ist, nicht in int16
, int32
oder int64
umgewandelt.
Eine umfassende Übersicht zur Zuordnung von SQL-Typen zu Pandas-dtypes finden Sie in der Typunterstützungstabelle unter dem Thema zu vektorisierten Python-UDFs.
Beispiel: Zeilensammlung unter Verwendung einer regulären UDTF vs. einer UDTF mit vektorisierter „end_partition“¶
Das folgende Beispiel zeigt, wie die Zeilensammlung mit einer regulären UDTF vorgenommen wird.
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Das folgende Beispiel zeigt, wie die Zeilensammlung mit einer UDTF mit vektorisierter „end_partition“ vorgenommen wird.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Beispiel: Berechnen der zusammenfassenden Statistik für jede Spalte in der Partition¶
Das folgende Beispiel zeigt, wie die Berechnung der zusammenfassenden Statistik für jede Spalte in der Partition mit der Pandas-Methode describe()
vorgenommen wird.
Erstellen Sie zunächst eine Tabelle, und generieren Sie 3 Partitionen mit jeweils 5 Zeilen.
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Werfen Sie einen Blick auf die Daten.
select * from test_values;
-----------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |
-----------------------------------------------------
|x |8.0 |99.4 |714.6 |168.7 |397.2 |
|x |106.4 |237.1 |971.7 |828.4 |988.2 |
|x |741.3 |207.9 |32.6 |640.6 |63.2 |
|x |541.3 |828.6 |844.9 |77.3 |403.1 |
|x |4.3 |723.3 |924.3 |282.5 |158.1 |
|y |976.1 |562.4 |968.7 |934.3 |977.3 |
|y |390.0 |244.3 |952.6 |101.7 |24.9 |
|y |599.7 |191.8 |90.2 |788.2 |761.2 |
|y |589.5 |201.0 |863.4 |415.1 |696.1 |
|y |46.7 |659.7 |571.1 |938.0 |513.7 |
|z |313.9 |188.5 |964.6 |435.4 |519.6 |
|z |328.3 |643.1 |766.4 |148.1 |596.4 |
|z |929.0 |255.4 |915.9 |857.2 |425.5 |
|z |612.8 |816.4 |220.2 |879.5 |331.4 |
|z |487.1 |704.5 |471.5 |378.9 |481.2 |
-----------------------------------------------------
Erstellen Sie dann die Funktion.
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas
class handler:
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# using describe function to get the summary statistics
result = df.describe().transpose()
# add a column at the beginning for column ids
result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
return result
$$;
Rufen Sie die Funktion auf, und führen Sie die Partitionierung über id
aus.
-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 |
|x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 |
|x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 |
|x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 |
|x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 |
|y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 |
|y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 |
|y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 |
|y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 |
|y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 |
|z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 |
|z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 |
|z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 |
|z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 |
|z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Alternativ können Sie die Funktion aufrufen und die gesamte Tabelle als eine Partition behandeln.
-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 |
|NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 |
|NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 |
|NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 |
|NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Best Practices¶
In diesem Abschnitt werden bewährte Verfahren beschrieben.
Um die Leistung zu verbessern und Timeouts zu vermeiden, sollten Sie
pandas.concat
nicht zum Akkumulieren von Teilergebnissen verwenden. Geben Sie stattdessen das Teilergebnis zurück, sobald es zur Verfügung steht. Verwenden Sie also nicht:results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Sondern:
while(...): partial_result = pd.DataFrame(...) yield partial_result
Wenn mit jeder Zeile ein skalarer Wert zurückgegeben werden muss, erstellen Sie eine Liste von sich wiederholenden Werten, anstatt das
numpy
-Array zu entpacken, um Tupel zu erstellen. Verwenden Sie für ein 2-Spalten-Ergebnis nicht:return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Sondern:
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
Um die Leistung zu verbessern, entpacken Sie semistrukturierte Daten in Spalten. Wenn Sie z. B. eine Variant-Spalte
obj
mit den Elementenx(int)
,y(float)
undz(string)
haben, dann definieren Sie stattdessen eine UDTF mit einer Signatur wie der folgenden:create function vec_udtf(variant obj)
Und wenn Sie es mit
vec_udtf(obj)
aufrufen, sollten Sie die UDTF mit folgender Signatur definieren:create function vec_udtf(int, float, string)
Und rufen Sie es mit
vec_udtf(obj:x, obj:y, obj:z)
auf.Standardmäßig kodiert Snowflake die Eingaben in Pandas-dtypes, die NULL-Werte unterstützen (z. B. Int64). Wenn Sie eine Bibliothek verwenden, die einen primitiven Typ erfordert (z. B.
numpy
), und Ihre Eingabe keine NULL-Werte enthält, sollten Sie die Spalte erst in einen primitiven Typ umwandeln, bevor Sie die Bibliothek verwenden. Beispiel:input_df['y'] = input_df['y'].astype("int64")
Weitere Informationen dazu finden Sie unter Typunterstützung.