Création de fonctions de table définies par l’utilisateur (UDTFs) pour DataFrames en Python

L’API Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction de table définie par l’utilisateur avec un gestionnaire écrit en Python. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Introduction

Vous pouvez créer une fonction de table définie par l’utilisateur (UDTF) en utilisant l’API Snowpark.

Vous procédez de la même manière que pour la création d’une fonction scalaire définie par l’utilisateur (UDF) avec les API, comme décrit dans Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python. Les principales différences concernent les exigences du gestionnaire UDF et les valeurs des paramètres requis lors de l’enregistrement de l’UDTF.

Pour créer et enregistrer une UDTF avec Snowpark, vous devez :

  • Implémenter un gestionnaire UDTF.

    Le gestionnaire contient la logique de l’UDTF. Un gestionnaire UDTF doit mettre en œuvre des fonctions que Snowflake invoquera au moment de l’exécution lorsque l’UDTF sera appelée. Pour plus d’informations, voir Implémentation d’un gestionnaire UDTF.

  • Enregistrez l’UDTF et son gestionnaire dans la base de données Snowflake.

    Vous pouvez utiliser l’API Snowpark pour enregistrer l’UDTF et son gestionnaire. Une fois que vous avez enregistré l’UDTF, vous pouvez l’appeler depuis SQL ou en utilisant l’API Snowpark. Pour plus d’informations sur l’enregistrement, voir Enregistrement d’une UDTF.

Pour plus d’informations sur l’appel d’une UDTF, voir Appels de fonctions de table définies par l’utilisateur (UDTFs).

Implémentation d’un gestionnaire UDTF

Comme décrit en détail dans Écriture d’une UDTF en Python, une classe de gestionnaire UDTF doit implémenter des méthodes que Snowflake invoque lorsque l’UDTF est appelée. Vous pouvez utiliser la classe que vous écrivez comme gestionnaire, que vous enregistriez l’UDTF avec l’API Snowpark ou que vous la créiez avec SQL en utilisant l’instruction CREATE FUNCTION.

Les méthodes d’une classe de gestionnaire sont conçues pour traiter les lignes et les partitions reçues par l’UDTF.

Une classe de gestionnaire UDTF implémente ce qui suit, que Snowflake invoque au moment de l’exécution :

  • Une méthode __init__. En option. Appelée pour initialiser le traitement avec état des partitions d’entrée.

  • Une méthode process. Requis. Appelée pour chaque ligne d’entrée. La méthode renvoie une valeur tabulaire sous forme de tuples.

  • Une méthode end_partition. En option. Appelée pour finaliser le traitement des partitions d’entrée.

    Bien que Snowflake prenne en charge les grandes partitions avec des délais d’expiration définis pour les traiter avec succès, les partitions particulièrement grandes peuvent entraîner des expirations (par exemple lorsque end_partition prend trop de temps à se terminer). Veuillez contacter le support Snowflake si vous avez besoin d’ajuster le seuil d’expiration pour des scénarios d’utilisation spécifiques.

Pour des détails et des exemples de gestionnaires, voir Écriture d’une UDTF en Python.

Enregistrement d’une UDTF

Une fois que vous avez implémenté un gestionnaire UDTF, vous pouvez utiliser l’API Snowpark pour enregistrer l’UDTF dans la base de données Snowflake. L’enregistrement de l’UDTF crée l’UDTF afin qu’elle puisse être appelée.

Vous pouvez enregistrer l’UDTF comme une fonction nommée ou anonyme, comme vous le faites pour une UDF scalaire. Pour des informations connexes sur l’enregistrement d’une UDF scalaire, voir Création d’une UDF anonyme et Création et enregistrement d’une UDF nommée.

Lorsque vous enregistrez une UDTF, vous spécifiez les valeurs des paramètres dont Snowflake a besoin pour créer l’UDTF. (Beaucoup de ces paramètres correspondent fonctionnellement à des clauses de l’instruction CREATE FUNCTION dans SQL. Pour plus d’informations, voir CREATE FUNCTION.)

La plupart de ces paramètres sont les mêmes que ceux que vous spécifiez lorsque vous créez une UDF scalaire (pour plus d’informations, voir Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python). Les principales différences sont dues au fait qu’une UDTF renvoie une valeur tabulaire et au fait que son gestionnaire est une classe, plutôt qu’une fonction. Pour une liste complète des paramètres, voir la documentation des APIs liée ci-dessous.

Pour enregistrer une UDTF avec Snowpark, utilisez l’une des méthodes suivantes, en spécifiant les valeurs des paramètres nécessaires pour créer l’UDTF dans la base de données. Pour des informations qui différencient ces options, voir UDFRegistration, qui décrit des options similaires pour l’enregistrement d’une UDF scalaire.

Définition des types d’entrée et du schéma de sortie d’une UDTF

Lorsque vous enregistrez une UDTF, vous spécifiez les détails concernant les paramètres et la valeur de sortie de la fonction. Vous faites cela pour que la fonction elle-même déclare des types qui correspondent exactement à ceux du gestionnaire sous-jacent de la fonction.

Pour des exemples, voir Exemples dans cette rubrique et dans la référence snowflake.snowpark.udtf.UDTFRegistration.

Lors de l’enregistrement de l’UDTF, vous spécifiez les éléments suivants :

  • Types de ses paramètres d’entrée comme valeur du paramètre input_types de la fonction d’enregistrement. Le paramètre input_types est facultatif si vous fournissez des indications de type dans la déclaration de la méthode process.

    Spécifiez cette valeur comme une liste de types basée sur snowflake.snowpark.types.DataType. Par exemple, vous pourriez spécifier input_types=[StringType(), IntegerType()].

  • Schéma de sa sortie tabulaire comme valeur du paramètre output_schema de la fonction d’enregistrement.

    La valeur output_schema peut être l’une des valeurs suivantes :

    • Une liste de noms de colonnes dans la valeur de retour de l’UDTF.

      La liste ne comprendra que les noms de colonnes, vous devez donc également fournir des indications de type dans la déclaration de la méthode process.

    • Un StructType qui représente les noms et les types des colonnes de la table de sortie.

      Le code de l’exemple suivant attribue un schéma comme valeur à une variable output puis utilise la variable lors de l’enregistrement de l’UDTF.

      >>> from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
      >>> from snowflake.snowpark.functions import udtf, table_function
      >>> schema = StructType([
      ...     StructField("symbol", StringType())
      ...     StructField("cost", IntegerType()),
      ... ])
      >>> @udtf(output_schema=schema,input_types=[StringType(), IntegerType(), FloatType()],stage_location="straut_udf",is_permanent=True,name="test_udtf",replace=True)
      ... class StockSale:
      ...     def process(self, symbol, quantity, price):
      ...         cost = quantity * price
      ...         yield (symbol, cost)
      
      Copy

Exemples

Voici une brève liste d’exemples. Pour plus d’exemples, voir snowflake.snowpark.udtf.UDTFRegistration.

Enregistrement d’une UDTF avec la fonction udtf

Enregistrez la fonction.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
Copy

Appelez la fonction.

>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

Enregistrement d’une UDTF avec la fonction d’enregistrement

Enregistrez la fonction.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self):
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
Copy

Appelez la fonction.

>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy

Enregistrement d’une UDTF avec la fonction register_from_file

Enregistrez la fonction.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
Copy

Appelez la fonction.

>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy