UDTFs Python vectorisées¶
Cette rubrique présente les UDTFs Python vectorisées.
Dans ce chapitre :
Vue d’ensemble¶
Les UDTFs de Python (fonctions de table définies par l’utilisateur) permettent d’opérer sur des lignes par lots.
Snowflake prend en charge deux types d’UDTFs vectorisées :
Les UDTFs avec une méthode vectorisée
end_partition
Les UDTFs avec une méthode vectorisée
process
Vous devez choisir un type, car une UDTF ne peut pas avoir à la fois une méthode process
vectorisée et une méthode end_partition
vectorisée.
Les UDTFs avec une méthode end_partition vectorisée¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames
and returning results as
pandas DataFrames
or lists of pandas arrays
or pandas Series.
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
Utilisez une méthode end_partition
vectorisée pour les tâches suivantes :
Traiter vos données partition par partition plutôt que ligne par ligne.
Renvoyer plusieurs lignes ou colonnes pour chaque partition.
Utiliser des bibliothèques qui opèrent sur des DataFrames pandas pour l’analyse des données.
UDTFs à l’aide d’une méthode de processus vectorisé¶
Les UDTFs avec une méthode process
vectorisée permettent d’opérer sur des lignes par lots, en supposant que l’opération effectue un mappage de 1 à 1. En d’autres termes, la méthode renvoie une ligne de sortie pour chaque ligne d’entrée. Le nombre de colonnes n’est pas limité.
Utilisez une méthode process
vectorisée pour les tâches suivantes :
Appliquer une transformation 1 pour 1 avec un résultat multi-colonnes par lots.
Utiliser une bibliothèque qui nécessite
pandas.DataFrame
.Traiter des lignes par lots, sans partitionnement explicite.
Tirer parti de la fonction to_pandas() API pour transformer le résultat de la requête directement en DataFrame pandas.
Conditions préalables¶
La bibliothèque Snowpark pour Python version 1.14.0 ou ultérieure est requise.
Créer une UDTF avec une méthode end_partition vectorisée¶
Si vous le souhaitez, définissez une classe de gestionnaire avec une méthode
__init__
qui sera invoquée avant le traitement de chaque partition.Remarque : ne définissez pas de méthode
process
.Définissez une méthode
end_partition
qui prend en argument DataFrame et renvoie ou produit unpandas.DataFrame
ou un tuple depandas.Series
oupandas.arrays
où chaque tableau est une colonne.Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF.
Pour marquer la méthode
end_partition
comme étant vectorisée, utilisez le décorateur@vectorized
ou l’attribut de fonction_sf_vectorized_input
.Pour plus d’informations, consultez UDFs vectorisées Python. Le décorateur
@vectorized
ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.
Note
Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une end_partition
vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivent les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.
Le bloc de code suivant est un exemple de création d’une UDTF avec une méthode end_partition
vectorisée, à l’aide du décorateur @vectorized
:
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Le bloc de code suivant est un exemple de création d’une UDTF avec une méthode end_partition
vectorisée, à l’aide de l’attribut de fonction :
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Note
Une UDTF avec une méthode end_partition
vectorisée doit être appelée avec une clause PARTITION BY pour construire les partitions.
Pour appeler l’UDTF avec toutes les données dans la même partition :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Pour appeler l’UDTF avec les données partitionnées par la colonne x :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Exemple : collecte de lignes à l’aide d’une UDTF classique ou d’une UDTF avec une méthode end_partition vectorisée¶
Collecte de lignes à l’aide d’une UDTF :
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Collecte de lignes à l’aide d’une UDTF avec une méthode end_partition
vectorisée :
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Exemple : calculez la statistique récapitulative pour chaque colonne de la partition¶
Voici un exemple de calcul de la statistique récapitulative pour chaque colonne de la partition à l’aide de la méthode describe()
pandas.
Créez une table et générez trois partitions de cinq lignes chacune :
create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float); -- generate 3 partitions of 5 rows each insert into test_values select 'x', uniform(1.5,1000.5,random(1))::float col1, uniform(1.5,1000.5,random(2))::float col2, uniform(1.5,1000.5,random(3))::float col3, uniform(1.5,1000.5,random(4))::float col4, uniform(1.5,1000.5,random(5))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'y', uniform(1.5,1000.5,random(10))::float col1, uniform(1.5,1000.5,random(20))::float col2, uniform(1.5,1000.5,random(30))::float col3, uniform(1.5,1000.5,random(40))::float col4, uniform(1.5,1000.5,random(50))::float col5 from table(generator(rowcount => 5)); insert into test_values select 'z', uniform(1.5,1000.5,random(100))::float col1, uniform(1.5,1000.5,random(200))::float col2, uniform(1.5,1000.5,random(300))::float col3, uniform(1.5,1000.5,random(400))::float col4, uniform(1.5,1000.5,random(500))::float col5 from table(generator(rowcount => 5));
Jetez un coup d’œil aux données :
select * from test_values; ----------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" | ----------------------------------------------------- |x |8.0 |99.4 |714.6 |168.7 |397.2 | |x |106.4 |237.1 |971.7 |828.4 |988.2 | |x |741.3 |207.9 |32.6 |640.6 |63.2 | |x |541.3 |828.6 |844.9 |77.3 |403.1 | |x |4.3 |723.3 |924.3 |282.5 |158.1 | |y |976.1 |562.4 |968.7 |934.3 |977.3 | |y |390.0 |244.3 |952.6 |101.7 |24.9 | |y |599.7 |191.8 |90.2 |788.2 |761.2 | |y |589.5 |201.0 |863.4 |415.1 |696.1 | |y |46.7 |659.7 |571.1 |938.0 |513.7 | |z |313.9 |188.5 |964.6 |435.4 |519.6 | |z |328.3 |643.1 |766.4 |148.1 |596.4 | |z |929.0 |255.4 |915.9 |857.2 |425.5 | |z |612.8 |816.4 |220.2 |879.5 |331.4 | |z |487.1 |704.5 |471.5 |378.9 |481.2 | -----------------------------------------------------
Créez la fonction :
create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float) returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float) language python RUNTIME_VERSION = 3.9 packages=('pandas') handler='handler' as $$ from _snowflake import vectorized import pandas class handler: @vectorized(input=pandas.DataFrame) def end_partition(self, df): # using describe function to get the summary statistics result = df.describe().transpose() # add a column at the beginning for column ids result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5']) return result $$;
Effectuez au choix une de ces étapes :
Appelez la fonction et partitionnez par
id
:-- partition by id select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by id)) order by id, column_name; -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 | |x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 | |x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 | |x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 | |x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 | |y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 | |y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 | |y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 | |y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 | |y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 | |z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 | |z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 | |z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 | |z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 | |z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 | --------------------------------------------------------------------------------------------------------------------------------------------------------------------
Appelez la fonction et traitez l’ensemble de la table comme une seule partition :
-- treat the whole table as one partition select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5) over (partition by 1)) order by id, column_name; --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 | |NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 | |NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 | |NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 | |NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Créer une UDTF à l’aide d’une méthode de processus vectorisé¶
Définissez une classe de gestionnaire (handler), similaire aux UDTFs classiques, avec des méthodes
__init__
etend_partition
facultatives.Définissez une méthode
process
qui prend un argument DataFrame et renvoie unpandas.DataFrame
ou un tuple depandas.Series
oupandas.arrays
où chaque tableau est une colonne.Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF. Le résultat renvoyé doit être exactement un DataFrame ou un tuple. Cela diffère d’une méthode
end_partition
vectorisée où vous pouvez céder ou renvoyer une liste.Pour marquer la méthode
process
comme étant vectorisée, utilisez le décorateur@vectorized
ou l’attribut de fonction_sf_vectorized_input
.Pour plus d’informations, consultez UDFs vectorisées Python. Le décorateur
@vectorized
ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.En option : si votre fonction de gestionnaire Python dépasse la limite de temps d’exécution, définissez une taille de lot cible.
Note
Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une process
vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivent les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.
Le gestionnaire (handler) d’une UDTF avec une méthode process
vectorisée peut être mis en œuvre pour traiter des lots en tenant compte de la partition ou pour les traiter simplement lot par lot. Pour plus d’informations, voir Traitement avec et sans état.
Exemple : utiliser une UDTF avec une méthode de traitement vectorisée pour appliquer un codage à chaud¶
Utilisez une UDTF avec une méthode process
vectorisée pour appliquer un codage à chaud sur une table de dix catégories :
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
Exemple de résultat :
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
Préparez l’impression de la table :
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Exemple de résultat :
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
Vous pouvez aussi obtenir le même résultat avec une UDF vectorisée, bien que cela soit moins pratique. Vous devez empaqueter les résultats dans une colonne, puis dépaqueter la colonne pour restaurer les résultats dans un DataFrame pandas utilisable.
Exemples d’utilisation d’une UDF vectorisée :
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
Prise en charge du type¶
Les UDTFs vectorisées prennent en charge les mêmes types SQL que les UDFs vectorisées. Cependant, pour les UDTFs vectorisées, les arguments SQL NUMBER
avec une échelle de 0 qui tiennent tous dans un type d’entier de 64 bits ou plus petit seront toujours mappés avec Int16
, Int32
, ou Int64
. En d’autres termes, contrairement aux UDFs scalaires, si l’argument d’une UDTF ne peut pas devenir null, il ne sera pas converti en int16
, int32
ou int64
.
Pour afficher une table montrant comment les types SQL sont mappés aux types Pandas, consultez le tableau de prise en charge des types dans la rubrique sur les UDFs vectorisées Python.
Meilleures pratiques¶
Si un scalaire doit être renvoyé avec chaque ligne, créez une liste de valeurs répétées au lieu de décompresser le tableau
numpy
pour créer des tuples. Par exemple, pour un résultat de deux colonnes, au lieu de :return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Utilisez ceci :
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
Pour améliorer les performances, décompressez les données semi-structurées en colonnes.
Par exemple, si vous avez une colonne de variante,
obj
, avec les élémentsx(int)
,y(float)
etz(string)
, alors au lieu de définir une UDTF avec une signature comme celle-ci, appelez-la avecvec_udtf(obj)
:create function vec_udtf(variant obj)
Définissez l’UDTF avec une signature comme celle-ci, et appelez-la en utilisant
vec_udtf(obj:x, obj:y, obj:z)
:create function vec_udtf(int, float, string)
Par défaut, Snowflake code les entrées dans des types pandas qui prennent en charge les valeurs NULL (par exemple, Int64). Si vous utilisez une bibliothèque qui nécessite un type primitif (tel que
numpy
) et que votre entrée n’a pas de valeurs NULL, vous devez convertir la colonne en type primitif avant d’utiliser la bibliothèque. Par exemple :input_df['y'] = input_df['y'].astype("int64")
Pour plus d’informations, voir Prise en charge du type.
Lors de l’utilisation d’UDTFs avec une méthode
end_partition
vectorisée, pour améliorer les performances et éviter les délais d’attente, évitez d’utiliserpandas.concat
pour accumuler des résultats partiels. Au lieu de cela, donnez le résultat partiel chaque fois que vous êtes prêt.Par exemple, au lieu de :
results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Faites ceci :
while(...): partial_result = pd.DataFrame(...) yield partial_result