Création de fonctions définies par l’utilisateur (UDAFs) pour DataFrames dans Python¶
Vous pouvez utiliser des APIs Snowpark Python pour créer et appeler des fonctions agrégées définies par l’utilisateur (UDAFs). Une UDAF prend une ou plusieurs lignes en entrée et produit une seule ligne en sortie. Elle agit sur les valeurs de lignes pour effectuer des calculs mathématiques tels que la somme, la moyenne, le comptage, les valeurs minimale/maximale, l’écart type et l’estimation, ainsi que d’autres opérations non mathématiques.
Pour créer et enregistrer une UDAF avec Snowpark, vous devez :
Implémenter un gestionnaire UDAF.
Le gestionnaire contient la logique de l’UDAF. Un gestionnaire UDAF doit mettre en œuvre des fonctions que Snowflake invoquera au moment de l’exécution lorsque l’UDAF sera appelée. Pour plus d’informations, voir Implémentation d’un gestionnaire (handler).
Enregistrez l’UDAF et son gestionnaire dans la base de données Snowflake.
Une fois que vous avez enregistré l’UDAF, vous pouvez l’appeler depuis SQL ou en utilisant l’API Snowpark. Vous pouvez utiliser l’API Snowpark pour enregistrer l’UDAF et son gestionnaire. Pour plus d’informations sur l’enregistrement, voir Enregistrement d’une UDAF.
Vous pouvez également créer vos propres UDAFs en utilisant le SQL comme décrit dans Fonctions agrégées définies par l’utilisateur Python.
Implémentation d’un gestionnaire (handler)¶
Comme décrit en détail dans Interface pour le gestionnaire (handler) de la fonction d’agrégation, une classe de gestionnaire (handler) UDAF doit implémenter des méthodes que Snowflake appelle lorsque l’UDAF est appelée. Vous pouvez utiliser la classe que vous écrivez comme gestionnaire (handler), que vous enregistriez l’UDAF avec l’API Snowpark ou que vous la créiez avec SQL en utilisant l’instruction CREATE FUNCTION.
Votre classe de gestionnaire (handler) UDAF implémente les méthodes listées dans le tableau suivant, que Snowflake appelle au moment de l’exécution. Voir des exemples dans ce chapitre.
Méthode |
Exigence |
Description |
---|---|---|
|
Obligatoire |
Initialise l’état interne d’un agrégat. |
|
Obligatoire |
Renvoie l’état interne d’un agrégat.
|
|
Obligatoire |
Accumule l’état de l’agrégat sur la base de la nouvelle ligne d’entrée. |
|
Obligatoire |
Combine deux états agrégés intermédiaires. |
|
Obligatoire |
Produit le résultat final sur la base de l’état agrégé. |
Enregistrement d’une UDAF¶
Une fois que vous avez implémenté un gestionnaire UDAF, vous pouvez utiliser l’API Snowpark pour enregistrer l’UDAF dans la base de données Snowflake. L’enregistrement de l’UDAF crée l’UDAF afin qu’elle puisse être appelée.
Vous pouvez enregistrer l’UDAF comme une fonction nommée ou anonyme, comme vous le faites pour une UDF scalaire. Pour des informations connexes sur l’enregistrement d’une UDF scalaire, voir Création d’une UDF anonyme et Création et enregistrement d’une UDF nommée. Lorsque vous enregistrez une UDAF, vous spécifiez les valeurs des paramètres dont Snowflake a besoin pour créer l’UDAF.
Vous pouvez enregistrer la fonction à l’aide des fonctions et méthodes suivantes :
Utilisez la méthode
register
ou la fonctionudaf
, en spécifiant le nom de votre classe de gestionnaire (handler), ainsi que les arguments pour définir la fonction. Vous pouvez également utiliserudaf
comme décorateur@udaf
sur la classe du gestionnaire (handler).Pour obtenir des informations de référence à ce sujet, consultez les pages suivantes :
Utilisez la fonction
register_from_file
en pointant vers un fichier Python ou un fichier zip contenant le code source Python.Pour la référence de la fonction, voir snowflake.snowpark.udtf.UDAFRegistration.register_from_file.
Exemples¶
Créer une UDAF avec une valeur de retour et un seul paramètre¶
Le code Python de l’exemple de gestionnaire (handler) suivant prend en charge une sum_int
UDAF qui reçoit un seul argument entier, ajoute la valeur d’une ligne à l’autre et renvoie le résultat.
Enregistrer la fonction¶
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
# This aggregate state is a primitive Python data type.
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value):
self._partial_sum += input_value
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Appelez la fonction¶
Le code Python de l’exemple suivant appelle l”sum_int
UDAF avec un DataFrame.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Créer une UDAF avec une valeur de retour et deux paramètres¶
Enregistrer la fonction¶
Le code Python de l’exemple de gestionnaire (handler) suivant prend en charge une sum_int
UDAF qui reçoit deux arguments entiers, additionne les valeurs des arguments sur plusieurs lignes et renvoie le résultat.
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value, input_value2):
self._partial_sum += input_value + input_value2
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Appelez la fonction¶
Le code Python de l’exemple suivant appelle l”sum_int
UDAF avec un DataFrame.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())