Création de fonctions de table définies par l’utilisateur (UDTFs) pour DataFrames en Python¶
L’API Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction de table définie par l’utilisateur avec un gestionnaire écrit en Python. Cette rubrique explique comment créer ces types de fonctions.
Dans ce chapitre :
Introduction¶
Vous pouvez créer une fonction de table définie par l’utilisateur (UDTF) en utilisant l’API Snowpark.
Vous procédez de la même manière que pour la création d’une fonction scalaire définie par l’utilisateur (UDF) avec les API, comme décrit dans Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python. Les principales différences concernent les exigences du gestionnaire UDF et les valeurs des paramètres requis lors de l’enregistrement de l’UDTF.
Pour créer et enregistrer une UDTF avec Snowpark, vous devez :
Implémenter un gestionnaire UDTF.
Le gestionnaire contient la logique de l’UDTF. Un gestionnaire UDTF doit mettre en œuvre des fonctions que Snowflake invoquera au moment de l’exécution lorsque l’UDTF sera appelée. Pour plus d’informations, voir Implémentation d’un gestionnaire UDTF.
Enregistrez l’UDTF et son gestionnaire dans la base de données Snowflake.
Vous pouvez utiliser l’API Snowpark pour enregistrer l’UDTF et son gestionnaire. Une fois que vous avez enregistré l’UDTF, vous pouvez l’appeler depuis SQL ou en utilisant l’API Snowpark. Pour plus d’informations sur l’enregistrement, voir Enregistrement d’une UDTF.
Pour plus d’informations sur l’appel d’une UDTF, voir Appels de fonctions de table définies par l’utilisateur (UDTFs).
Implémentation d’un gestionnaire UDTF¶
Comme décrit en détail dans Écriture d’une UDTF en Python, une classe de gestionnaire UDTF doit implémenter des méthodes que Snowflake invoque lorsque l’UDTF est appelée. Vous pouvez utiliser la classe que vous écrivez comme gestionnaire, que vous enregistriez l’UDTF avec l’API Snowpark ou que vous la créiez avec SQL en utilisant l’instruction CREATE FUNCTION.
Les méthodes d’une classe de gestionnaire sont conçues pour traiter les lignes et les partitions reçues par l’UDTF.
Une classe de gestionnaire UDTF implémente ce qui suit, que Snowflake invoque au moment de l’exécution :
Une méthode
__init__
. En option. Appelée pour initialiser le traitement avec état des partitions d’entrée.Une méthode
process
. Requis. Appelée pour chaque ligne d’entrée. La méthode renvoie une valeur tabulaire sous forme de tuples.Une méthode
end_partition
. En option. Appelée pour finaliser le traitement des partitions d’entrée.Bien que Snowflake prenne en charge les grandes partitions avec des délais d’expiration définis pour les traiter avec succès, les partitions particulièrement grandes peuvent entraîner des expirations (par exemple lorsque
end_partition
prend trop de temps à se terminer). Veuillez contacter le support Snowflake si vous avez besoin d’ajuster le seuil d’expiration pour des scénarios d’utilisation spécifiques.
Pour des détails et des exemples de gestionnaires, voir Écriture d’une UDTF en Python.
Enregistrement d’une UDTF¶
Une fois que vous avez implémenté un gestionnaire UDTF, vous pouvez utiliser l’API Snowpark pour enregistrer l’UDTF dans la base de données Snowflake. L’enregistrement de l’UDTF crée l’UDTF afin qu’elle puisse être appelée.
Vous pouvez enregistrer l’UDTF comme une fonction nommée ou anonyme, comme vous le faites pour une UDF scalaire. Pour des informations connexes sur l’enregistrement d’une UDF scalaire, voir Création d’une UDF anonyme et Création et enregistrement d’une UDF nommée.
Lorsque vous enregistrez une UDTF, vous spécifiez les valeurs des paramètres dont Snowflake a besoin pour créer l’UDTF. (Beaucoup de ces paramètres correspondent fonctionnellement à des clauses de l’instruction CREATE FUNCTION dans SQL. Pour plus d’informations, voir CREATE FUNCTION.)
La plupart de ces paramètres sont les mêmes que ceux que vous spécifiez lorsque vous créez une UDF scalaire (pour plus d’informations, voir Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python). Les principales différences sont dues au fait qu’une UDTF renvoie une valeur tabulaire et au fait que son gestionnaire est une classe, plutôt qu’une fonction. Pour une liste complète des paramètres, voir la documentation des APIs liée ci-dessous.
Pour enregistrer une UDTF avec Snowpark, utilisez l’une des méthodes suivantes, en spécifiant les valeurs des paramètres nécessaires pour créer l’UDTF dans la base de données. Pour des informations qui différencient ces options, voir UDFRegistration, qui décrit des options similaires pour l’enregistrement d’une UDF scalaire.
Utilisez la fonction
register
ouudtf
qui pointe vers une fonction Python d’exécution. Vous pouvez également utiliser la fonctionudtf
comme décorateur sur la classe du gestionnaire.Pour des références sur ces fonctions, voir :
Utilisez la fonction
register_from_file
en pointant vers un fichier Python ou un fichier zip contenant le code source Python.Pour la référence de la fonction, voir snowflake.snowpark.udtf.UDTFRegistration.register_from_file.
Définition des types d’entrée et du schéma de sortie d’une UDTF¶
Lorsque vous enregistrez une UDTF, vous spécifiez les détails concernant les paramètres et la valeur de sortie de la fonction. Vous faites cela pour que la fonction elle-même déclare des types qui correspondent exactement à ceux du gestionnaire sous-jacent de la fonction.
Pour des exemples, voir Exemples dans cette rubrique et dans la référence snowflake.snowpark.udtf.UDTFRegistration.
Lors de l’enregistrement de l’UDTF, vous spécifiez les éléments suivants :
Types de ses paramètres d’entrée comme valeur du paramètre
input_types
de la fonction d’enregistrement. Le paramètreinput_types
est facultatif si vous fournissez des indications de type dans la déclaration de la méthodeprocess
.Spécifiez cette valeur comme une liste de types basée sur snowflake.snowpark.types.DataType. Par exemple, vous pourriez spécifier
input_types=[StringType(), IntegerType()]
.Schéma de sa sortie tabulaire comme valeur du paramètre
output_schema
de la fonction d’enregistrement.La valeur
output_schema
peut être l’une des valeurs suivantes :Une liste de noms de colonnes dans la valeur de retour de l’UDTF.
La liste ne comprendra que les noms de colonnes, vous devez donc également fournir des indications de type dans la déclaration de la méthode
process
.Un StructType qui représente les noms et les types des colonnes de la table de sortie.
Le code de l’exemple suivant attribue un schéma comme valeur à une variable
output
puis utilise la variable lors de l’enregistrement de l’UDTF.>>> from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType >>> from snowflake.snowpark.functions import udtf, table_function >>> schema = StructType([ ... StructField("symbol", StringType()) ... StructField("cost", IntegerType()), ... ]) >>> @udtf(output_schema=schema,input_types=[StringType(), IntegerType(), FloatType()],stage_location="straut_udf",is_permanent=True,name="test_udtf",replace=True) ... class StockSale: ... def process(self, symbol, quantity, price): ... cost = quantity * price ... yield (symbol, cost)
Exemples¶
Voici une brève liste d’exemples. Pour plus d’exemples, voir snowflake.snowpark.udtf.UDTFRegistration.
Enregistrement d’une UDTF avec la fonction udtf
Enregistrez la fonction.
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
... def process(self, n):
... for i in range(n):
... yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
Appelez la fonction.
>>> session.table_function(generator_udtf(lit(3))).collect() # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect() # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Enregistrement d’une UDTF avec la fonction d’enregistrement
Enregistrez la fonction.
>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
... def __init__(self):
... self._total_per_partition = 0
...
... def process(self, s1: str) -> Iterable[Tuple[str, int]]:
... words = s1.split()
... self._total_per_partition = len(words)
... counter = Counter(words)
... yield from counter.items()
...
... def end_partition(self):
... yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
... MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
Appelez la fonction.
>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD" |"COUNT" |
-----------------------------
|w1 |1 |
|w2 |2 |
|w3 |3 |
|partition_total |6 |
-----------------------------
Enregistrement d’une UDTF avec la fonction register_from_file
Enregistrez la fonction.
>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
... file_path="@mystage/test_udtf_file.py",
... handler_name="GeneratorUDTF",
... output_schema=StructType([StructField("number", IntegerType())]),
... input_types=[IntegerType()]
... )
Appelez la fonction.
>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]