Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python¶
L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une lambda ou d’une fonction Python. Cette rubrique explique comment créer ces types de fonctions.
Dans ce chapitre :
Introduction¶
Avec Snowpark, vous pouvez créer des fonctions définies par l’utilisateur (UDFs) pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.
Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark charge le code de votre fonction dans une zone de préparation interne. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.
Dans votre code personnalisé, vous pouvez également importer des modules à partir de fichiers Python ou de paquets tiers.
Vous pouvez créer une UDF pour votre code personnalisé de deux façons :
Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.
Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.
Les sections suivantes expliquent comment créer ces UDFs à l’aide d’un environnement de développement local ou à l’aide d’une feuille de calcul Python.
Notez que si vous avez défini une UDF en exécutant la commande CREATE FUNCTION
, vous pouvez appeler cette UDF dans Snowpark. Pour plus de détails, voir Appel de fonctions définies par l’utilisateur (UDFs).
Note
Les UDFs Python vectorisées vous permettent de définir des fonctions Python qui reçoivent des lots de lignes d’entrée comme DataFrames Pandas. Cela permet d’obtenir de bien meilleures performances avec les scénarios d’inférence de machine learning. Pour plus d’informations, voir Utilisation des UDFs vectorisées.
Note
Si vous travaillez avec une feuille de calcul Python, utilisez ces exemples dans la fonction de gestionnaire :
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
Si les exemples renvoient autre chose qu’un DataFrame, comme une list
d’objets Row
, modifiez le type de retour pour qu’il corresponde au type de retour de l’exemple.
Après avoir exécuté un exemple de code, utilisez l’onglet Results pour afficher toute sortie renvoyée. Reportez-vous à Exécution de feuilles de calcul Python pour plus de détails.
Spécifier des dépendances pour une UDF¶
Pour définir une UDF à l’aide de l’API Snowpark, vous devez importer les fichiers contenant tous les modules dont dépend votre UDF, tels que les fichiers Python, les fichiers zip, les fichiers de ressources, etc.
Pour réaliser cette opération à l’aide de feuilles de calcul Python, reportez-vous à Ajouter un fichier Python d’une zone de préparation à une feuille de calcul.
Pour réaliser cette opération à l’aide de votre environnement de développement local, vous devez appeler
Session.add_import()
dans votre code.
Vous pouvez également spécifier un répertoire et la bibliothèque Snowpark le compresse automatiquement et le charge sous forme de fichier zip. (Pour plus de détails sur la lecture des ressources d’une UDF, voir Lecture de fichiers à partir d’une UDF).
Lorsque vous appelez Session.add_import()
, la bibliothèque Snowpark charge les fichiers spécifiés dans une zone de préparation interne et importe les fichiers lors de l’exécution de votre UDF.
L’exemple suivant montre comment ajouter un fichier zip dans une zone de préparation en tant que dépendance à votre code :
>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")
Les exemples suivants montrent comment ajouter un fichier Python à partir de votre machine locale :
>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")
>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Les exemples suivants montrent comment ajouter d’autres types de dépendances :
>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")
>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")
Note
La bibliothèque Python Snowpark n’est pas chargée automatiquement.
Vous n’avez pas besoin de spécifier les dépendances suivantes :
Vos bibliothèques intégrées Python.
Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.
Utilisation de paquets tiers à partir d’Anaconda dans une UDF¶
Vous pouvez utiliser des packages tiers du canal Snowflake Anaconda dans une UDF.
Si vous créez une UDF Python dans une feuille de calcul Python, les packages Anaconda sont déjà disponibles pour votre feuille de calcul. Reportez-vous à Ajouter un fichier Python d’une zone de préparation à une feuille de calcul.
Si vous créez une UDF Python dans votre environnement de développement local, vous pouvez spécifier les packages Anaconda à installer.
Lorsque des requêtes qui appellent des UDFs Python sont exécutées dans un entrepôt Snowflake, les paquets Anaconda sont installés de manière transparente et mis en cache dans l’entrepôt virtuel en votre nom.
Pour plus d’informations sur les meilleures pratiques, sur la façon de visualiser les paquets disponibles et sur la façon de configurer un environnement de développement local, voir Utilisation de paquets tiers.
Si vous écrivez une UDF Python dans votre environnement de développement local, utilisez session.add_packages
pour ajouter des packages au niveau de la session.
Cet exemple de code montre comment importer des paquets et renvoyer leurs versions.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")
>>> @udf
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Vous pouvez également utiliser session.add_requirements
pour spécifier des paquets avec un fichier d’exigences.
>>> session.add_requirements("mydir/requirements.txt")
Vous pouvez ajouter les paquets de niveau UDF pour remplacer les packages de niveau session que vous avez pu ajouter précédemment.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Important
Si vous ne spécifiez pas de version de package, Snowflake utilise la dernière version lors de la résolution des dépendances. Lorsque vous déployez l’UDF en production, vous pouvez vouloir vous assurer que votre code utilise toujours les mêmes versions de dépendances. Vous pouvez le faire pour des UDFs permanentes et temporaires.
Lorsque vous créez une UDF permanente, l’UDF est créée et enregistrée une seule fois. Cela résout les dépendances une fois et la version sélectionnée est utilisée pour les charges de travail de production. Lorsque l’UDF s’exécute, il utilise toujours les mêmes versions de dépendance.
Lorsque vous créez une UDF temporaire, spécifiez les versions des dépendances dans le cadre de la spécification de la version. De cette façon, lorsque l’UDF est enregistrée, la résolution du package utilise la version spécifiée. Si vous ne spécifiez pas la version, la dépendance peut être mise à jour lorsqu’une nouvelle version est disponible.
Création d’une UDF anonyme¶
Pour créer une UDF anonyme, vous pouvez soit :
Appeler la fonction
udf
dans le modulesnowflake.snowpark.functions
et transmettre la définition de la fonction anonyme.Appeler la méthode
register
dans la classeUDFRegistration
, et transmettre la définition de la fonction anonyme.
Voici un exemple d’une UDF anonyme :
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Note
Lorsque vous écrivez du code susceptible de s’exécuter dans plusieurs sessions, utilisez la méthode register
pour enregistrer des UDFs, plutôt que d’utiliser la fonction udf
. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session
par défaut ne peut être trouvé.
Création et enregistrement d’une UDF nommée¶
Si vous souhaitez appeler une UDF par son nom (par exemple, en utilisant la fonction call_udf
dans le module functions
), vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, voici ce que vous pouvez utiliser :
La méthode
register
dans la classeUDFRegistration
avec l’argumentname
.La fonction
udf
, dans le modulesnowflake.snowpark.functions
avec l’argumentname
.
Pour accéder à un attribut ou à une méthode de la classe UDFRegistration
appelez la propriété udf
de la classe Session
.
L’appel d’un register
ou d’une udf
créera une UDF temporaire que vous pouvez utiliser dans la session en cours.
Pour créer une UDF permanente, appelez la méthode register
ou la fonction udf
et définissez l’argument is_permanent
sur True
. Lorsque vous créez une UDF permanente, vous devez également définir l’argument stage_location
à l’emplacement de la zone de préparation où le fichier Python pour l’UDF et ses dépendances sont chargées.
Voici un exemple de la manière d’enregistrer une UDF temporaire nommée :
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Voici un exemple de la façon d’enregistrer une UDF permanente nommée en définissant l’argument is_permanent
sur True
:
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
... return x-1
Voici un exemple d’appel de ces UDFs :
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Vous pouvez également appeler l’UDF en utilisant SQL :
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Création d’une UDF à partir d’un fichier source Python¶
Si vous créez votre UDF dans votre environnement de développement local, vous pouvez également définir votre gestionnaire d’UDF dans un fichier Python, puis utiliser la méthode register_from_file
de la classe UDFRegistration
pour créer une UDF.
Note
Vous ne pouvez pas utiliser cette méthode dans une feuille de calcul Python.
Voici des exemples d’utilisation de register_from_file
.
Supposons que vous ayez un fichier Python test_udf_file.py
qui contient :
def mod5(x: int) -> int:
return x % 5
Ensuite, vous pouvez créer une UDF à partir de cette fonction du fichier test_udf_file.py
.
>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
... file_path="tests/resources/test_udf_dir/test_udf_file.py",
... func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Vous pouvez également charger le fichier vers un emplacement de zone de préparation, puis l’utiliser pour créer l’UDF.
>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
... file_path="@mystage/test_udf_file.py",
... func_name="mod5",
... return_type=IntegerType(),
... input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Lecture de fichiers à partir d’une UDF¶
Pour lire le contenu d’un fichier, votre code Python peut :
Lire un fichier spécifié de façon statique en important un fichier et en le lisant ensuite dans le répertoire personnel de l’UDF.
Lire un fichier spécifié de façon dynamique avec SnowflakeFile. Vous pouvez le faire si vous avez besoin d’accéder à un fichier pendant un calcul.
Lecture de fichiers spécifiés de façon statique¶
La bibliothèque Snowpark charge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.
Note
Si vous écrivez votre UDF dans une feuille de calcul Python, l’UDF ne peut lire que les fichiers d’une zone de préparation.
Pour configurer une UDF pour lire un fichier :
Spécifiez que le fichier est une dépendance, ce qui charge le fichier sur le serveur. Pour plus d’informations, voir Spécifier des dépendances pour une UDF.
Par exemple :
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
Dans l’UDF, lisez le fichier. Dans l’exemple suivant, le fichier ne sera lu qu’une fois lors de la création de l’UDF, et ne sera pas relu lors de l’exécution de l’UDF. Ceci est réalisé grâce à une bibliothèque tierce cachetools.
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> >>> def add_suffix(s): ... return f"{read_file(os.path.basename(temp_file_name))}-{s}" >>> >>> concat_file_content_with_str_udf = session.udf.register( ... add_suffix, ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
Lire des fichiers spécifiés de façon dynamique avec SnowflakeFile
¶
Vous pouvez lire un fichier depuis une zone de préparation en utilisant la classe SnowflakeFile
dans le module Snowpark snowflake.snowpark.files
. La classe SnowflakeFile
offre un accès dynamique aux fichiers, ce qui vous permet de diffuser des fichiers de n’importe quelle taille. L’accès dynamique aux fichiers est également utile lorsque vous souhaitez itérer sur plusieurs fichiers. Par exemple, voir Traitement de plusieurs fichiers.
Pour plus d’informations et d’exemples sur la lecture de fichiers à l’aide de SnowflakeFile
, voir Lecture d’un fichier à l’aide de la classe SnowflakeFile dans un gestionnaire d’UDF Python.
L’exemple suivant enregistre une UDF temporaire qui lit un fichier texte à partir d’une zone de préparation utilisant SnowflakeFile
et renvoie la longueur du fichier.
Enregistrer l’UDF :
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Appelez l’UDF :
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Utilisation des UDFs vectorisées¶
Les UDFs Python vectorisées vous permettent de définir des fonctions Python qui reçoivent des lots de lignes d’entrée sous forme de DataFrames Pandas et renvoient des lots de résultats sous forme de tableaux ou de séries Pandas. La colonne dans le dataframe
Snowpark sera vectorisée comme une série Pandas à l’intérieur de l’UDF.
Voici un exemple d’utilisation de l’interface lots :
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
Vous appelez des UDFs Python vectorisées de la même manière que vous appelez les autres UDFs Python. Pour plus d’informations, consultez UDFs Python vectorisées, qui explique comment créer une UDF vectorielle à l’aide d’une instruction SQL. Par exemple, vous pouvez utiliser le décorateur vectorized
lorsque vous spécifiez le code Python dans l’instruction SQL. En utilisant l’API Snowpark Python décrite dans ce document, vous n’utilisez pas une instruction SQL pour créer une UDF vectorielle. On n’utilise donc pas le décorateur vectorized
.
Il est possible de limiter le nombre de lignes par lot. Pour plus d’informations, voir Définir une taille de lot cible.
Pour plus d’explications et d’exemples sur l’utilisation de l’API Snowpark Python pour créer des UDFs vectorielles, reportez-vous à la section sur les UDFs de la référence API de Snowpark.