Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python

L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une lambda ou d’une fonction Python. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Introduction

Avec Snowpark, vous pouvez créer des fonctions définies par l’utilisateur (UDFs) pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.

Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark télécharge le code de votre fonction dans une zone de préparation interne. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.

Dans votre code personnalisé, vous pouvez également importer des modules à partir de fichiers Python ou de paquets tiers.

Vous pouvez créer une UDF pour votre code personnalisé de deux façons :

  • Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.

  • Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.

Les sections suivantes expliquent comment créer ces UDFs.

Notez que si vous avez défini une UDF en exécutant la commande CREATE FUNCTION, vous pouvez appeler cette UDF dans Snowpark. Pour plus de détails, voir Appel de fonctions définies par l’utilisateur (UDFs).

Note

Il existe une API par lots UDF Python, qui permet de définir des fonctions Python recevant des lots de lignes d’entrée sous forme de DataFrames Pandas. L’interface lots permet d’obtenir de bien meilleures performances avec les scénarios d’inférence de machine learning. Pour plus d’informations, voir Utilisation d’UDFs vectorisées via l’API par lots UDF Python.

Spécifier des dépendances pour une UDF

Afin de définir une UDF par le biais de l’API de Snowpark, vous devez appeler Session.add_import() pour tous les fichiers qui contiennent des modules dont dépend votre UDF (par exemple, des fichiers Python, des fichiers zip, des fichiers de ressources, etc.) Vous pouvez également spécifier un répertoire et la bibliothèque Snowpark le compressera automatiquement et le chargera sous forme de fichier zip. (Pour plus de détails sur la lecture des ressources d’une UDF, voir Création d’une UDF à partir d’un fichier source Python).

La bibliothèque Snowpark télécharge ces fichiers dans une zone de préparation interne et importe les fichiers lors de l’exécution de votre UDF.

L’exemple suivant montre comment ajouter un fichier zip dans une zone de préparation en tant que dépendance :

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  

Les exemples suivants montrent comment ajouter un fichier Python à partir de votre machine locale :

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  

Les exemples suivants montrent comment ajouter d’autres types de dépendances :

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  

Note

La bibliothèque Python Snowpark ne sera pas chargée automatiquement.

Vous ne devriez pas avoir besoin de spécifier les dépendances suivantes :

  • Vos bibliothèques intégrées Python.

    Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.

Utilisation de paquets tiers à partir d’Anaconda dans une UDF

Vous pouvez spécifier les paquets Anaconda à installer lorsque vous créez des UDFs Python. Lorsque des requêtes qui appellent des UDFs Python sont exécutées dans un entrepôt Snowflake, les paquets Anaconda sont installés de manière transparente et mis en cache dans l’entrepôt virtuel en votre nom. Pour plus d’informations sur les meilleures pratiques, sur la façon de visualiser les paquets disponibles et sur la façon de configurer un environnement de développement local, voir Utilisation de paquets tiers.

Utilisez session.add_packages pour ajouter des paquets au niveau de la session.

Cet exemple de code montre comment importer des paquets et renvoyer leurs versions.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]

Vous pouvez également utiliser session.add_requirements pour spécifier des paquets avec un fichier d’exigences.

>>> session.add_requirements("mydir/requirements.txt")  

Vous pouvez ajouter les paquets de niveau UDF pour remplacer les packages de niveau session que vous avez pu ajouter précédemment.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]

Important

Si vous ne spécifiez pas de version de paquet, Snowflake utilisera la dernière version lors de la résolution des dépendances. Cependant, lorsque vous déployez l’UDF en production, vous pouvez vouloir vous assurer que votre code utilise toujours les mêmes versions de dépendances. Vous pouvez le faire pour des UDFs permanentes et temporaires.

  • Lorsque vous créez une UDF permanente, l’UDF est créée et enregistrée une seule fois. Cela résout les dépendances une fois et la version sélectionnée est utilisée pour les charges de travail de production. Lorsque l’UDF s’exécute, elle utilise toujours les mêmes versions de dépendances.

  • Lorsque vous créez une UDF temporaire, spécifiez les versions des dépendances dans le cadre de la spécification de la version. De cette façon, lorsque l’UDF est enregistrée, la résolution du paquet utilisera la version spécifiée. Si vous ne spécifiez pas la version, la dépendance peut être mise à jour lorsqu’une nouvelle version est disponible.

Création d’une UDF anonyme

Pour créer une UDF anonyme, vous pouvez soit :

  • Appeler la fonction udf dans le module snowflake.snowpark.functions et transmettre la définition de la fonction anonyme.

  • Appeler la méthode register dans la classe UDFRegistration, et transmettre la définition de la fonction anonyme.

Voici un exemple d’une UDF anonyme :

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])

Note

Lorsque vous écrivez du code susceptible de s’exécuter dans plusieurs sessions, utilisez la méthode register pour enregistrer des UDFs, plutôt que d’utiliser la fonction udf. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session par défaut ne peut être trouvé.

Création et enregistrement d’une UDF nommée

Si vous souhaitez appeler une UDF par son nom (par exemple, en utilisant la fonction call_udf dans le module functions), vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, voici ce que vous pouvez utiliser :

  • La méthode register dans la classe UDFRegistration avec l’argument name.

  • La fonction udf, dans le module snowflake.snowpark.functions avec l’argument name.

Pour accéder à un attribut ou à une méthode de la classe UDFRegistration appelez la propriété udf de la classe Session.

L’appel d’un register ou d’une udf créera une UDF temporaire que vous pouvez utiliser dans la session en cours.

Pour créer une UDF permanente, appelez la méthode register ou la fonction udf et définissez l’argument is_permanent sur True. Lorsque vous créez une UDF permanente, vous devez également définir l’argument stage_location à l’emplacement de la zone de préparation où le fichier Python pour l’UDF et ses dépendances sont chargées.

Voici un exemple de la manière d’enregistrer une UDF temporaire nommée :

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)

Voici un exemple de la façon d’enregistrer une UDF permanente nommée en définissant l’argument is_permanent sur True :

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1

Voici un exemple d’appel de ces UDFs :

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]

Création d’une UDF à partir d’un fichier source Python

Vous pouvez également définir votre gestionnaire d’UDF dans un fichier Python, puis utiliser la méthode register_from_file de la classe UDFRegistration pour créer une UDF.

Voici des exemples d’utilisation de register_from_file.

Supposons que vous ayez un fichier Python test_udf_file.py qui contient :

def mod5(x: int) -> int:
    return x % 5

Ensuite, vous pouvez créer une UDF à partir de cette fonction du fichier test_udf_file.py.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Vous pouvez également charger le fichier vers un emplacement de zone de préparation, puis l’utiliser pour créer l’UDF.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Lecture de fichiers à partir d’une UDF

Comme mentionné précédemment, la bibliothèque Snowpark télécharge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.

Pour configurer une UDF pour lire un fichier :

  1. Spécifiez que le fichier est une dépendance, ce qui charge le fichier sur le serveur. Pour plus d’informations, voir Spécifier des dépendances pour une UDF.

    Par exemple :

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
  2. Dans l’UDF, lisez le fichier. Dans l’exemple suivant, le fichier ne sera lu qu’une fois lors de la création de l’UDF, et ne sera pas relu lors de l’exécution de l’UDF. Ceci est réalisé grâce à une bibliothèque tierce cachetools.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    

Utilisation d’UDFs vectorisées via l’API par lots UDF Python

L’API par lots UDF Python permet de définir des fonctions Python qui reçoivent des lots de lignes d’entrée sous forme de DataFrames Pandas et renvoient des lots de résultats sous forme de tableaux ou de séries Pandas. La colonne dans le dataframe Snowpark sera vectorisée comme une série Pandas à l’intérieur de l’UDF.

Voici un exemple d’utilisation de l’interface lots :

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)

Vous appelez des UDFs Python vectorisées qui utilisent l’API par lots de la même manière que vous appelez les autres UDFs Python. Pour plus d’informations, consultez API par lots UDF Python, qui explique comment créer une UDF vectorielle à l’aide d’une instruction SQL. Par exemple, vous pouvez utiliser le décorateur vectorized lorsque vous spécifiez le code Python dans l’instruction SQL. En utilisant l’API Snowpark Python décrite dans ce document, vous n’utilisez pas une instruction SQL pour créer une UDF vectorielle. On n’utilise donc pas le décorateur vectorized.

Il est possible de limiter le nombre de lignes par lot. Pour plus d’informations, voir Définir une taille de lot cible.

Pour plus d’explications et d’exemples sur l’utilisation de l’API Snowpark Python pour créer des UDFs vectorielles, reportez-vous à la section sur les UDFs de la référence API de Snowpark.