Création de fonctions définies par l’utilisateur (UDAFs) pour DataFrames dans Python

Vous pouvez utiliser des APIs Snowpark Python pour créer et appeler des fonctions agrégées définies par l’utilisateur (UDAFs). Une UDAF prend une ou plusieurs lignes en entrée et produit une seule ligne en sortie. Elle agit sur les valeurs de lignes pour effectuer des calculs mathématiques tels que la somme, la moyenne, le comptage, les valeurs minimale/maximale, l’écart type et l’estimation, ainsi que d’autres opérations non mathématiques.

Pour créer et enregistrer une UDAF avec Snowpark, vous devez :

  • Implémenter un gestionnaire UDAF.

    Le gestionnaire contient la logique de l’UDAF. Un gestionnaire UDAF doit mettre en œuvre des fonctions que Snowflake invoquera au moment de l’exécution lorsque l’UDAF sera appelée. Pour plus d’informations, voir Implémentation d’un gestionnaire (handler).

  • Enregistrez l’UDAF et son gestionnaire dans la base de données Snowflake.

    Une fois que vous avez enregistré l’UDAF, vous pouvez l’appeler depuis SQL ou en utilisant l’API Snowpark. Vous pouvez utiliser l’API Snowpark pour enregistrer l’UDAF et son gestionnaire. Pour plus d’informations sur l’enregistrement, voir Enregistrement d’une UDAF.

Vous pouvez également créer vos propres UDAFs en utilisant le SQL comme décrit dans Fonctions agrégées définies par l’utilisateur Python.

Implémentation d’un gestionnaire (handler)

Comme décrit en détail dans Interface pour le gestionnaire (handler) de la fonction d’agrégation, une classe de gestionnaire (handler) UDAF doit implémenter des méthodes que Snowflake appelle lorsque l’UDAF est appelée. Vous pouvez utiliser la classe que vous écrivez comme gestionnaire (handler), que vous enregistriez l’UDAF avec l’API Snowpark ou que vous la créiez avec SQL en utilisant l’instruction CREATE FUNCTION.

Votre classe de gestionnaire (handler) UDAF implémente les méthodes listées dans le tableau suivant, que Snowflake appelle au moment de l’exécution. Voir des exemples dans ce chapitre.

Méthode

Exigence

Description

__init__

Obligatoire

Initialise l’état interne d’un agrégat.

aggregate_state

Obligatoire

Renvoie l’état interne d’un agrégat.

accumulate

Obligatoire

Accumule l’état de l’agrégat sur la base de la nouvelle ligne d’entrée.

merge

Obligatoire

Combine deux états agrégés intermédiaires.

finish

Obligatoire

Produit le résultat final sur la base de l’état agrégé.

Enregistrement d’une UDAF

Une fois que vous avez implémenté un gestionnaire UDAF, vous pouvez utiliser l’API Snowpark pour enregistrer l’UDAF dans la base de données Snowflake. L’enregistrement de l’UDAF crée l’UDAF afin qu’elle puisse être appelée.

Vous pouvez enregistrer l’UDAF comme une fonction nommée ou anonyme, comme vous le faites pour une UDF scalaire. Pour des informations connexes sur l’enregistrement d’une UDF scalaire, voir Création d’une UDF anonyme et Création et enregistrement d’une UDF nommée. Lorsque vous enregistrez une UDAF, vous spécifiez les valeurs des paramètres dont Snowflake a besoin pour créer l’UDAF.

Vous pouvez enregistrer la fonction à l’aide des fonctions et méthodes suivantes :

Exemples

Créer une UDAF avec une valeur de retour et un seul paramètre

Le code Python de l’exemple de gestionnaire (handler) suivant prend en charge une sum_int UDAF qui reçoit un seul argument entier, ajoute la valeur d’une ligne à l’autre et renvoie le résultat.

Enregistrer la fonction

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
  def __init__(self):
    # This aggregate state is a primitive Python data type.
    self._partial_sum = 0

  @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value):
    self._partial_sum += input_value

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Appelez la fonction

Le code Python de l’exemple suivant appelle l”sum_int UDAF avec un DataFrame.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Copy

Créer une UDAF avec une valeur de retour et deux paramètres

Enregistrer la fonction

Le code Python de l’exemple de gestionnaire (handler) suivant prend en charge une sum_int UDAF qui reçoit deux arguments entiers, additionne les valeurs des arguments sur plusieurs lignes et renvoie le résultat.

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
  class PythonSumUDAF:
    def __init__(self):
      self._partial_sum = 0

    @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value, input_value2):
    self._partial_sum += input_value + input_value2

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

Appelez la fonction

Le code Python de l’exemple suivant appelle l”sum_int UDAF avec un DataFrame.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())
Copy