UDTFs Python vectorisées¶
Cette rubrique présente les UDTFs Python vectorisées.
Dans ce chapitre :
Vue d’ensemble¶
Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.
À utiliser lorsque :
Vous devez traiter vos données partition par partition plutôt que ligne par ligne.
Vous devez renvoyer plusieurs lignes ou colonnes pour chaque partition.
Vous souhaitez utiliser des bibliothèques qui opèrent sur des DataFramespandas pour l’analyse des données.
Conditions préalables¶
La bibliothèque Snowpark pour Python version 1.6.1 ou ultérieure est requise.
Prise en main¶
Pour créer une UDTF avec un end_partition vectorisée :
Si vous le souhaitez, définissez une classe de gestionnaire avec une méthode
__init__
qui sera invoquée avant le traitement de chaque partition.Ne définissez pas de méthode
process
.Définissez une méthode
end_partition
qui prend en argument DataFrame et renvoie ou produit unpandas.DataFrame
ou un tuple depandas.Series
oupandas.arrays
où chaque tableau est une colonne. Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF.Marquez la méthode
end_partition
comme étant vectorisée en utilisant le décorateur@vectorized
ou l’attribut de fonction_sf_vectorized_input
. Pour plus d’informations, reportez-vous à UDFs Python vectorisées. Le décorateur@vectorized
ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.
Note
Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une end_partition vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivront les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.
Voici un exemple de création d’une UDTF avec une end_partition vectorisée, à l’aide du décorateur @vectorized
.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Voici un exemple de création d’une UDTF avec une end_partition vectorisée à l’aide de l’attribut function.
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Note
Une UDTF avec une end_partition vectorisée doit être appelée avec la clause PARTITION BY pour construire les partitions.
Pour appeler l’UDTF avec toutes les données dans la même partition :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Pour appeler l’UDTF avec les données partitionnées par la colonne x :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Prise en charge du type¶
Les UDTFs avec une end_partition vectorisée prennent en charge les mêmes types SQL que les UDFs Python vectorisées pour les arguments et les valeurs de retour. Cependant, pour les UDTFs avec une end_partition vectorisée, les arguments SQL NUMBER
avec une échelle de 0 qui tiennent tous dans un type d’entier de 64 bits ou plus petit seront toujours mappés avec Int16
, Int32
, ou Int64
. En d’autres termes, contrairement aux UDFs scalaires, si l’argument d’une UDTF ne peut pas devenir null, il ne sera pas converti en int16
, int32
ou int64
.
Pour afficher une table montrant comment les types SQL sont mappés aux types Pandas, consultez le tableau de prise en charge des types dans la rubrique sur les UDFs vectorisées Python.
Exemple : collecte de lignes à l’aide d’une UDTF classique ou d’une UDTF avec une end_partition vectorisée¶
Voici un exemple de collecte de lignes à l’aide d’une UDTF classique.
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Voici un exemple de collecte de lignes à l’aide d’une UDTF avec une end_partition vectorisée.
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Exemple : calculez la statistique récapitulative pour chaque colonne de la partition¶
Voici un exemple de calcul de la statistique récapitulative pour chaque colonne de la partition à l’aide de la méthode describe()
pandas.
Tout d’abord, créez une table et générez 3 partitions de 5 lignes chacune.
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));
insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Jetez un coup d’œil aux données.
select * from test_values;
-----------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |
-----------------------------------------------------
|x |8.0 |99.4 |714.6 |168.7 |397.2 |
|x |106.4 |237.1 |971.7 |828.4 |988.2 |
|x |741.3 |207.9 |32.6 |640.6 |63.2 |
|x |541.3 |828.6 |844.9 |77.3 |403.1 |
|x |4.3 |723.3 |924.3 |282.5 |158.1 |
|y |976.1 |562.4 |968.7 |934.3 |977.3 |
|y |390.0 |244.3 |952.6 |101.7 |24.9 |
|y |599.7 |191.8 |90.2 |788.2 |761.2 |
|y |589.5 |201.0 |863.4 |415.1 |696.1 |
|y |46.7 |659.7 |571.1 |938.0 |513.7 |
|z |313.9 |188.5 |964.6 |435.4 |519.6 |
|z |328.3 |643.1 |766.4 |148.1 |596.4 |
|z |929.0 |255.4 |915.9 |857.2 |425.5 |
|z |612.8 |816.4 |220.2 |879.5 |331.4 |
|z |487.1 |704.5 |471.5 |378.9 |481.2 |
-----------------------------------------------------
Ensuite, créez la fonction.
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas
class handler:
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# using describe function to get the summary statistics
result = df.describe().transpose()
# add a column at the beginning for column ids
result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
return result
$$;
Appelez la fonction et partitionnez par id
.
-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 |
|x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 |
|x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 |
|x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 |
|x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 |
|y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 |
|y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 |
|y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 |
|y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 |
|y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 |
|z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 |
|z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 |
|z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 |
|z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 |
|z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Vous pouvez également appeler la fonction et traiter l’ensemble de la table comme une seule partition.
-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 |
|NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 |
|NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 |
|NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 |
|NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Meilleures pratiques¶
Cette section décrit les meilleures pratiques.
Pour améliorer les performances et éviter les délais d’attente, évitez d’utiliser
pandas.concat
pour accumuler des résultats partiels. Au lieu de cela, donnez le résultat partiel chaque fois que vous êtes prêt. Par exemple, au lieu de :results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Faites ceci :
while(...): partial_result = pd.DataFrame(...) yield partial_result
Si un scalaire doit être renvoyé avec chaque ligne, créez une liste de valeurs répétées au lieu de décompresser le tableau
numpy
pour créer des tuples. Par exemple, pour un résultat de 2 colonnes, au lieu de :return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Faites ceci :
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
Pour améliorer les performances, décompressez les données semi-structurées en colonnes. Par exemple, si vous avez une colonne de variante,
obj
, avec les élémentsx(int)
,y(float)
etz(string)
, alors au lieu de définir une UDTF avec une signature comme celle-ci :create function vec_udtf(variant obj)
Et l’appeler en utilisant
vec_udtf(obj)
, vous devez définir l’UDTF avec la signature :create function vec_udtf(int, float, string)
Et l’appeler en utilisant
vec_udtf(obj:x, obj:y, obj:z)
.Par défaut, Snowflake code les entrées dans des types pandas qui prennent en charge les valeurs NULL (par exemple, Int64). Si vous utilisez une bibliothèque qui nécessite un type primitif (tel que
numpy
) et que votre entrée n’a pas de valeurs NULL, vous devez convertir la colonne en type primitif avant d’utiliser la bibliothèque. Par exemple :input_df['y'] = input_df['y'].astype("int64")
Pour plus d’informations, voir Prise en charge du type.