UDTFs Python vectorisées

Cette rubrique présente les UDTFs Python vectorisées.

Dans ce chapitre :

Vue d’ensemble

Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

À utiliser lorsque :

  • Vous devez traiter vos données partition par partition plutôt que ligne par ligne.

  • Vous devez renvoyer plusieurs lignes ou colonnes pour chaque partition.

  • Vous souhaitez utiliser des bibliothèques qui opèrent sur des DataFramespandas pour l’analyse des données.

Conditions préalables

La bibliothèque Snowpark pour Python version 1.6.1 ou ultérieure est requise.

Prise en main

Pour créer une UDTF avec un end_partition vectorisée :

  • Si vous le souhaitez, définissez une classe de gestionnaire avec une méthode __init__ qui sera invoquée avant le traitement de chaque partition.

  • Ne définissez pas de méthode process.

  • Définissez une méthode end_partition qui prend en argument DataFrame et renvoie ou produit un pandas.DataFrame ou un tuple de pandas.Series ou pandas.arrays où chaque tableau est une colonne. Les types de colonne du résultat doivent correspondre aux types de colonne de la définition d’une UDTF.

  • Marquez la méthode end_partition comme étant vectorisée en utilisant le décorateur @vectorized ou l’attribut de fonction _sf_vectorized_input. Pour plus d’informations, reportez-vous à UDFs Python vectorisées. Le décorateur @vectorized ne peut être utilisé que lorsque l’UDTF Python est exécutée dans Snowflake, par exemple, lors de l’utilisation d’une feuille de calcul SQL. Lorsque l’exécution se fait à l’aide du client ou d’une feuille de calcul Python, vous devez utiliser l’attribut function.

Note

Les noms de colonne par défaut pour le DataFrame d’entrée dans une UDTF avec une end_partition vectorisée correspondent à la signature de la fonction SQL. Les noms des colonnes suivront les exigences de l’identificateur SQL. À savoir, si un identificateur n’est pas entre guillemets, il sera mis en majuscules, et s’il est entre guillemets, il sera conservé tel quel.

Voici un exemple de création d’une UDTF avec une end_partition vectorisée, à l’aide du décorateur @vectorized.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

Voici un exemple de création d’une UDTF avec une end_partition vectorisée à l’aide de l’attribut function.

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Note

Une UDTF avec une end_partition vectorisée doit être appelée avec la clause PARTITION BY pour construire les partitions.

Pour appeler l’UDTF avec toutes les données dans la même partition :

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

Pour appeler l’UDTF avec les données partitionnées par la colonne x :

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Prise en charge du type

Les UDTFs avec une end_partition vectorisée prennent en charge les mêmes types SQL que les UDFs Python vectorisées pour les arguments et les valeurs de retour. Cependant, pour les UDTFs avec une end_partition vectorisée, les arguments SQL NUMBER avec une échelle de 0 qui tiennent tous dans un type d’entier de 64 bits ou plus petit seront toujours mappés avec Int16, Int32, ou Int64. En d’autres termes, contrairement aux UDFs scalaires, si l’argument d’une UDTF ne peut pas devenir null, il ne sera pas converti en int16, int32 ou int64.

Pour afficher une table montrant comment les types SQL sont mappés aux types Pandas, consultez le tableau de prise en charge des types dans la rubrique sur les UDFs vectorisées Python.

Exemple : collecte de lignes à l’aide d’une UDTF classique ou d’une UDTF avec une end_partition vectorisée

Voici un exemple de collecte de lignes à l’aide d’une UDTF classique.

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Voici un exemple de collecte de lignes à l’aide d’une UDTF avec une end_partition vectorisée.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Exemple : calculez la statistique récapitulative pour chaque colonne de la partition

Voici un exemple de calcul de la statistique récapitulative pour chaque colonne de la partition à l’aide de la méthode describe() pandas.

Tout d’abord, créez une table et générez 3 partitions de 5 lignes chacune.

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

Jetez un coup d’œil aux données.

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

Ensuite, créez la fonction.

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

Appelez la fonction et partitionnez par id.

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Vous pouvez également appeler la fonction et traiter l’ensemble de la table comme une seule partition.

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Meilleures pratiques

Cette section décrit les meilleures pratiques.

  1. Pour améliorer les performances et éviter les délais d’attente, évitez d’utiliser pandas.concat pour accumuler des résultats partiels. Au lieu de cela, donnez le résultat partiel chaque fois que vous êtes prêt. Par exemple, au lieu de :

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Faites ceci :

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
  2. Si un scalaire doit être renvoyé avec chaque ligne, créez une liste de valeurs répétées au lieu de décompresser le tableau numpy pour créer des tuples. Par exemple, pour un résultat de 2 colonnes, au lieu de :

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Faites ceci :

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  3. Pour améliorer les performances, décompressez les données semi-structurées en colonnes. Par exemple, si vous avez une colonne de variante, obj, avec les éléments x(int), y(float) et z(string), alors au lieu de définir une UDTF avec une signature comme celle-ci :

    create function vec_udtf(variant obj)
    
    Copy

    Et l’appeler en utilisant vec_udtf(obj), vous devez définir l’UDTF avec la signature :

    create function vec_udtf(int, float, string)
    
    Copy

    Et l’appeler en utilisant vec_udtf(obj:x, obj:y, obj:z).

  4. Par défaut, Snowflake code les entrées dans des types pandas qui prennent en charge les valeurs NULL (par exemple, Int64). Si vous utilisez une bibliothèque qui nécessite un type primitif (tel que numpy) et que votre entrée n’a pas de valeurs NULL, vous devez convertir la colonne en type primitif avant d’utiliser la bibliothèque. Par exemple :

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Pour plus d’informations, voir Prise en charge du type.