Vektorisierte Python-UDTFs

Dieses Thema bietet eine Einführung in vektorisierte Python-UDTFs.

Unter diesem Thema:

Übersicht

Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

Verwenden Sie solche Funktionen in folgenden Fällen:

  • Sie müssen Ihre Daten Partition für Partition und nicht Zeile für Zeile verarbeiten.

  • Sie müssen mehrere Zeilen oder Spalten für jede Partition zurückgeben.

  • Sie möchten Bibliotheken verwenden, die für die Datenanalyse Pandas-DataFrames verwenden.

Voraussetzungen

Die Snowpark-Bibliothek für Python Version 1.6.1 oder höher ist erforderlich.

Erste Schritte

So erstellen Sie eine UDTF mit einer vektorisierten „end_partition“:

  • Optional können Sie eine Handler-Klasse mit einer __init__-Methode definieren, die vor jedem Verarbeiten einer Partition aufgerufen wird.

  • Definieren Sie keine process-Methode.

  • Definieren Sie eine end_partition -Methode, die ein DataFrame-Argument entgegennimmt und ein pandas.DataFrame oder ein Tupel von pandas.Series oder pandas.arrays zurückgibt oder ergibt, wobei jedes Array eine Spalte ist. Die Spaltentypen des Ergebnisses müssen mit den Spaltentypen der UDTF-Definition übereinstimmen.

  • Markieren Sie die Methode end_partition als vektorisiert, indem Sie das Decorator-Element @vectorized oder das Funktionsattribut _sf_vectorized_input verwenden. Weitere Informationen dazu finden Sie unter Vektorisierte Python-UDFs. Das Decorator-Element @vectorized kann nur verwendet werden, wenn die Python-UDTF innerhalb von Snowflake ausgeführt wird, z. B. bei Verwendung eines SQL-Arbeitsblatts. Wenn Sie zum Ausführen den Client oder ein Python-Arbeitsblatt verwenden, müssen Sie das Funktionsattribut verwenden.

Bemerkung

Die Standard-Spaltennamen für den Eingabe-DataFrame einer UDTF mit vektorisierter „end_partition“ entsprechen der Signatur der SQL-Funktion. Die Spaltennamen werden den Anforderungen für SQL-Bezeichner entsprechen. Das bedeutet, wenn ein Bezeichner nicht in Anführungszeichen steht, wird er in Großbuchstaben geschrieben, und wenn er in doppelten Anführungszeichen steht, wird er so beibehalten, wie er ist.

Im folgenden Beispiel wird eine UDTF mit vektorisierter „end_partition“ unter Verwendung des Decorator-Elements @vectorized erstellt.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

Im folgenden Beispiel wird eine UDTF mit vektorisierter „end_partition“ unter Verwendung des Funktionsattributs erstellt.

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Bemerkung

Eine UDTF mit vektorisierter „end_partition“ muss mit der PARTITION BY-Klausel aufgerufen werden, damit die Partitionen erstellt werden.

So rufen Sie die UDTF mit allen Daten in derselben Partition auf:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

So rufen Sie die UDTF mit den durch Spalte x partitionierten Daten auf:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Typunterstützung

UDTFs mit vektorisierter „end_partition“ unterstützen bei Argumenten und Rückgabewerten die gleichen SQL-Typen wie vektorisierte Python-UDFs. Bei UDTFs mit vektorisierter „end_partition“ werden SQL-NUMBER-Argumente mit einer Dezimalstellenzahl von 0, die alle in einen 64-Bit-Ganzzahl-Typ oder kleiner passen, immer Int16, Int32 oder Int64 zugeordnet. Das heißt, dass anders als bei skalaren UDFs das Argument einer skalaren UDTF, das nicht nullwertfähig ist, nicht in int16, int32 oder int64 umgewandelt.

Eine umfassende Übersicht zur Zuordnung von SQL-Typen zu Pandas-dtypes finden Sie in der Typunterstützungstabelle unter dem Thema zu vektorisierten Python-UDFs.

Beispiel: Zeilensammlung unter Verwendung einer regulären UDTF vs. einer UDTF mit vektorisierter „end_partition“

Das folgende Beispiel zeigt, wie die Zeilensammlung mit einer regulären UDTF vorgenommen wird.

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Das folgende Beispiel zeigt, wie die Zeilensammlung mit einer UDTF mit vektorisierter „end_partition“ vorgenommen wird.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Beispiel: Berechnen der zusammenfassenden Statistik für jede Spalte in der Partition

Das folgende Beispiel zeigt, wie die Berechnung der zusammenfassenden Statistik für jede Spalte in der Partition mit der Pandas-Methode describe() vorgenommen wird.

Erstellen Sie zunächst eine Tabelle, und generieren Sie 3 Partitionen mit jeweils 5 Zeilen.

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

Werfen Sie einen Blick auf die Daten.

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

Erstellen Sie dann die Funktion.

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

Rufen Sie die Funktion auf, und führen Sie die Partitionierung über id aus.

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Alternativ können Sie die Funktion aufrufen und die gesamte Tabelle als eine Partition behandeln.

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Best Practices

In diesem Abschnitt werden bewährte Verfahren beschrieben.

  1. Um die Leistung zu verbessern und Timeouts zu vermeiden, sollten Sie pandas.concat nicht zum Akkumulieren von Teilergebnissen verwenden. Geben Sie stattdessen das Teilergebnis zurück, sobald es zur Verfügung steht. Verwenden Sie also nicht:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Sondern:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
  2. Wenn mit jeder Zeile ein skalarer Wert zurückgegeben werden muss, erstellen Sie eine Liste von sich wiederholenden Werten, anstatt das numpy-Array zu entpacken, um Tupel zu erstellen. Verwenden Sie für ein 2-Spalten-Ergebnis nicht:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Sondern:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  3. Um die Leistung zu verbessern, entpacken Sie semistrukturierte Daten in Spalten. Wenn Sie z. B. eine Variant-Spalte obj mit den Elementen x(int), y(float) und z(string) haben, dann definieren Sie stattdessen eine UDTF mit einer Signatur wie der folgenden:

    create function vec_udtf(variant obj)
    
    Copy

    Und wenn Sie es mit vec_udtf(obj) aufrufen, sollten Sie die UDTF mit folgender Signatur definieren:

    create function vec_udtf(int, float, string)
    
    Copy

    Und rufen Sie es mit vec_udtf(obj:x, obj:y, obj:z) auf.

  4. Standardmäßig kodiert Snowflake die Eingaben in Pandas-dtypes, die NULL-Werte unterstützen (z. B. Int64). Wenn Sie eine Bibliothek verwenden, die einen primitiven Typ erfordert (z. B. numpy), und Ihre Eingabe keine NULL-Werte enthält, sollten Sie die Spalte erst in einen primitiven Typ umwandeln, bevor Sie die Bibliothek verwenden. Beispiel:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Weitere Informationen dazu finden Sie unter Typunterstützung.