UDTFs Python vectorisées

Cette rubrique présente les UDTFs Python vectorisées.

Dans ce chapitre :

Vue d’ensemble

Les UDTFs de Python (fonctions de table définies par l’utilisateur) permettent d’opérer sur des lignes par lots.

Snowflake prend en charge deux types d’UDTFs vectorisées :

  • Les UDTFs avec une méthode vectorisée end_partition

  • Les UDTFs avec une méthode vectorisée process

Vous devez choisir un type, car une UDTF ne peut pas avoir à la fois une méthode process vectorisée et une méthode end_partition vectorisée.

Les UDTFs avec une méthode end_partition vectorisée

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

Utilisez une méthode end_partition vectorisée pour les tâches suivantes :

  • Traiter vos données partition par partition plutôt que ligne par ligne.

  • Renvoyer plusieurs lignes ou colonnes pour chaque partition.

  • Utiliser des bibliothèques qui opèrent sur des DataFrames pandas pour l’analyse des données.

UDTFs à l’aide d’une méthode de processus vectorisé

Les UDTFs avec une méthode process vectorisée permettent d’opérer sur des lignes par lots, en supposant que l’opération effectue un mappage de 1 à 1. En d’autres termes, la méthode renvoie une ligne de sortie pour chaque ligne d’entrée. Le nombre de colonnes n’est pas limité.

Utilisez une méthode process vectorisée pour les tâches suivantes :

  • Appliquer une transformation 1 pour 1 avec un résultat multi-colonnes par lots.

  • Utiliser une bibliothèque qui nécessite pandas.DataFrame.

  • Traiter des lignes par lots, sans partitionnement explicite.

  • Tirer parti de la fonction to_pandas() API pour transformer le résultat de la requête directement en DataFrame pandas.

Conditions préalables

La bibliothèque Snowpark pour Python version 1.14.0 ou ultérieure est requise.

Créer une UDTF avec une méthode end_partition vectorisée

  1. Si vous le souhaitez, définissez une classe de gestionnaire avec une méthode __init__ qui sera invoquée avant le traitement de chaque partition.

    Remarque : ne définissez pas de méthode process.

  2. Définissez une méthode end_partition qui prend en argument DataFrame et renvoie ou produit un pandas.DataFrame ou un tuple de pandas.Series ou pandas.arrays où chaque tableau est une colonne.

    Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF.

  3. Pour marquer la méthode end_partition comme étant vectorisée, utilisez le décorateur @vectorized ou l’attribut de fonction _sf_vectorized_input.

    Pour plus d’informations, consultez UDFs vectorisées Python. Le décorateur @vectorized ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.

Note

Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une end_partition vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivent les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.

Le bloc de code suivant est un exemple de création d’une UDTF avec une méthode end_partition vectorisée, à l’aide du décorateur @vectorized :

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

Le bloc de code suivant est un exemple de création d’une UDTF avec une méthode end_partition vectorisée, à l’aide de l’attribut de fonction :

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Note

Une UDTF avec une méthode end_partition vectorisée doit être appelée avec une clause PARTITION BY pour construire les partitions.

Pour appeler l’UDTF avec toutes les données dans la même partition :

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

Pour appeler l’UDTF avec les données partitionnées par la colonne x :

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Exemple : collecte de lignes à l’aide d’une UDTF classique ou d’une UDTF avec une méthode end_partition vectorisée

Collecte de lignes à l’aide d’une UDTF :

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Collecte de lignes à l’aide d’une UDTF avec une méthode end_partition vectorisée :

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Exemple : calculez la statistique récapitulative pour chaque colonne de la partition

Voici un exemple de calcul de la statistique récapitulative pour chaque colonne de la partition à l’aide de la méthode describe() pandas.

  1. Créez une table et générez trois partitions de cinq lignes chacune :

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. Jetez un coup d’œil aux données :

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. Créez la fonction :

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. Effectuez au choix une de ces étapes :

    • Appelez la fonction et partitionnez par id :

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • Appelez la fonction et traitez l’ensemble de la table comme une seule partition :

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

Créer une UDTF à l’aide d’une méthode de processus vectorisé

  1. Définissez une classe de gestionnaire (handler), similaire aux UDTFs classiques, avec des méthodes __init__ et end_partition facultatives.

  2. Définissez une méthode process qui prend un argument DataFrame et renvoie un pandas.DataFrame ou un tuple de pandas.Series ou pandas.arrays où chaque tableau est une colonne.

    Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF. Le résultat renvoyé doit être exactement un DataFrame ou un tuple. Cela diffère d’une méthode end_partition vectorisée où vous pouvez céder ou renvoyer une liste.

  3. Pour marquer la méthode process comme étant vectorisée, utilisez le décorateur @vectorized ou l’attribut de fonction _sf_vectorized_input.

    Pour plus d’informations, consultez UDFs vectorisées Python. Le décorateur @vectorized ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.

  4. En option : si votre fonction de gestionnaire Python dépasse la limite de temps d’exécution, définissez une taille de lot cible.

Note

Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une process vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivent les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.

Le gestionnaire (handler) d’une UDTF avec une méthode process vectorisée peut être mis en œuvre pour traiter des lots en tenant compte de la partition ou pour les traiter simplement lot par lot. Pour plus d’informations, voir Traitement avec et sans état.

Exemple : utiliser une UDTF avec une méthode de traitement vectorisée pour appliquer un codage à chaud

Utilisez une UDTF avec une méthode process vectorisée pour appliquer un codage à chaud sur une table de dix catégories :

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

Exemple de résultat :

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

Préparez l’impression de la table :

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

Exemple de résultat :

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

Vous pouvez aussi obtenir le même résultat avec une UDF vectorisée, bien que cela soit moins pratique. Vous devez empaqueter les résultats dans une colonne, puis dépaqueter la colonne pour restaurer les résultats dans un DataFrame pandas utilisable.

Exemples d’utilisation d’une UDF vectorisée :

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

Prise en charge du type

Les UDTFs vectorisées prennent en charge les mêmes types SQL que les UDFs vectorisées. Cependant, pour les UDTFs vectorisées, les arguments SQL NUMBER avec une échelle de 0 qui tiennent tous dans un type d’entier de 64 bits ou plus petit seront toujours mappés avec Int16, Int32, ou Int64. En d’autres termes, contrairement aux UDFs scalaires, si l’argument d’une UDTF ne peut pas devenir null, il ne sera pas converti en int16, int32 ou int64.

Pour afficher une table montrant comment les types SQL sont mappés aux types Pandas, consultez le tableau de prise en charge des types dans la rubrique sur les UDFs vectorisées Python.

Meilleures pratiques

  • Si un scalaire doit être renvoyé avec chaque ligne, créez une liste de valeurs répétées au lieu de décompresser le tableau numpy pour créer des tuples. Par exemple, pour un résultat de deux colonnes, au lieu de :

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Utilisez ceci :

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • Pour améliorer les performances, décompressez les données semi-structurées en colonnes.

    Par exemple, si vous avez une colonne de variante, obj, avec les éléments x(int), y(float) et z(string), alors au lieu de définir une UDTF avec une signature comme celle-ci, appelez-la avec vec_udtf(obj) :

    create function vec_udtf(variant obj)
    
    Copy

    Définissez l’UDTF avec une signature comme celle-ci, et appelez-la en utilisant vec_udtf(obj:x, obj:y, obj:z) :

    create function vec_udtf(int, float, string)
    
    Copy
  • Par défaut, Snowflake code les entrées dans des types pandas qui prennent en charge les valeurs NULL (par exemple, Int64). Si vous utilisez une bibliothèque qui nécessite un type primitif (tel que numpy) et que votre entrée n’a pas de valeurs NULL, vous devez convertir la colonne en type primitif avant d’utiliser la bibliothèque. Par exemple :

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Pour plus d’informations, voir Prise en charge du type.

  • Lors de l’utilisation d’UDTFs avec une méthode end_partition vectorisée, pour améliorer les performances et éviter les délais d’attente, évitez d’utiliser pandas.concat pour accumuler des résultats partiels. Au lieu de cela, donnez le résultat partiel chaque fois que vous êtes prêt.

    Par exemple, au lieu de :

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Faites ceci :

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy