Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Python

L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une lambda ou d’une fonction Python. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Introduction

Avec Snowpark, vous pouvez créer des fonctions définies par l’utilisateur (UDFs) pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.

Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark charge le code de votre fonction dans une zone de préparation interne. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.

Dans votre code personnalisé, vous pouvez également importer des modules à partir de fichiers Python ou de paquets tiers.

Vous pouvez créer une UDF pour votre code personnalisé de deux façons :

  • Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.

  • Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.

Les sections suivantes expliquent comment créer ces UDFs à l’aide d’un environnement de développement local ou à l’aide d’une feuille de calcul Python.

Notez que si vous avez défini une UDF en exécutant la commande CREATE FUNCTION, vous pouvez appeler cette UDF dans Snowpark. Pour plus de détails, voir Appel de fonctions définies par l’utilisateur (UDFs).

Note

Les UDFs Python vectorisées vous permettent de définir des fonctions Python qui reçoivent des lots de lignes d’entrée comme DataFrames Pandas. Cela permet d’obtenir de bien meilleures performances avec les scénarios d’inférence de machine learning. Pour plus d’informations, voir Utilisation des UDFs vectorisées.

Note

Si vous travaillez avec une feuille de calcul Python, utilisez ces exemples dans la fonction de gestionnaire :

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

Si les exemples renvoient autre chose qu’un DataFrame, comme une list d’objets Row , modifiez le type de retour pour qu’il corresponde au type de retour de l’exemple.

Après avoir exécuté un exemple de code, utilisez l’onglet Results pour afficher toute sortie renvoyée. Reportez-vous à Exécution de feuilles de calcul Python pour plus de détails.

Spécifier des dépendances pour une UDF

Pour définir une UDF à l’aide de l’API Snowpark, vous devez importer les fichiers contenant tous les modules dont dépend votre UDF, tels que les fichiers Python, les fichiers zip, les fichiers de ressources, etc.

Vous pouvez également spécifier un répertoire et la bibliothèque Snowpark le compresse automatiquement et le charge sous forme de fichier zip. (Pour plus de détails sur la lecture des ressources d’une UDF, voir Lecture de fichiers à partir d’une UDF).

Lorsque vous appelez Session.add_import(), la bibliothèque Snowpark charge les fichiers spécifiés dans une zone de préparation interne et importe les fichiers lors de l’exécution de votre UDF.

L’exemple suivant montre comment ajouter un fichier zip dans une zone de préparation en tant que dépendance à votre code :

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

Les exemples suivants montrent comment ajouter un fichier Python à partir de votre machine locale :

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

Les exemples suivants montrent comment ajouter d’autres types de dépendances :

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

Note

La bibliothèque Python Snowpark n’est pas chargée automatiquement.

Vous n’avez pas besoin de spécifier les dépendances suivantes :

  • Vos bibliothèques intégrées Python.

    Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.

Utilisation de paquets tiers à partir d’Anaconda dans une UDF

Vous pouvez utiliser des packages tiers du canal Snowflake Anaconda dans une UDF.

Lorsque des requêtes qui appellent des UDFs Python sont exécutées dans un entrepôt Snowflake, les paquets Anaconda sont installés de manière transparente et mis en cache dans l’entrepôt virtuel en votre nom.

Pour plus d’informations sur les meilleures pratiques, sur la façon de visualiser les paquets disponibles et sur la façon de configurer un environnement de développement local, voir Utilisation de paquets tiers.

Si vous écrivez une UDF Python dans votre environnement de développement local, utilisez session.add_packages pour ajouter des packages au niveau de la session.

Cet exemple de code montre comment importer des paquets et renvoyer leurs versions.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

Vous pouvez également utiliser session.add_requirements pour spécifier des paquets avec un fichier d’exigences.

>>> session.add_requirements("mydir/requirements.txt")  
Copy

Vous pouvez ajouter les paquets de niveau UDF pour remplacer les packages de niveau session que vous avez pu ajouter précédemment.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

Important

Si vous ne spécifiez pas de version de package, Snowflake utilise la dernière version lors de la résolution des dépendances. Lorsque vous déployez l’UDF en production, vous pouvez vouloir vous assurer que votre code utilise toujours les mêmes versions de dépendances. Vous pouvez le faire pour des UDFs permanentes et temporaires.

  • Lorsque vous créez une UDF permanente, l’UDF est créée et enregistrée une seule fois. Cela résout les dépendances une fois et la version sélectionnée est utilisée pour les charges de travail de production. Lorsque l’UDF s’exécute, il utilise toujours les mêmes versions de dépendance.

  • Lorsque vous créez une UDF temporaire, spécifiez les versions des dépendances dans le cadre de la spécification de la version. De cette façon, lorsque l’UDF est enregistrée, la résolution du package utilise la version spécifiée. Si vous ne spécifiez pas la version, la dépendance peut être mise à jour lorsqu’une nouvelle version est disponible.

Création d’une UDF anonyme

Pour créer une UDF anonyme, vous pouvez soit :

  • Appeler la fonction udf dans le module snowflake.snowpark.functions et transmettre la définition de la fonction anonyme.

  • Appeler la méthode register dans la classe UDFRegistration, et transmettre la définition de la fonction anonyme.

Voici un exemple d’une UDF anonyme :

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Note

Lorsque vous écrivez du code susceptible de s’exécuter dans plusieurs sessions, utilisez la méthode register pour enregistrer des UDFs, plutôt que d’utiliser la fonction udf. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session par défaut ne peut être trouvé.

Création et enregistrement d’une UDF nommée

Si vous souhaitez appeler une UDF par son nom (par exemple, en utilisant la fonction call_udf dans le module functions), vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, voici ce que vous pouvez utiliser :

  • La méthode register dans la classe UDFRegistration avec l’argument name.

  • La fonction udf, dans le module snowflake.snowpark.functions avec l’argument name.

Pour accéder à un attribut ou à une méthode de la classe UDFRegistration appelez la propriété udf de la classe Session.

L’appel d’un register ou d’une udf créera une UDF temporaire que vous pouvez utiliser dans la session en cours.

Pour créer une UDF permanente, appelez la méthode register ou la fonction udf et définissez l’argument is_permanent sur True. Lorsque vous créez une UDF permanente, vous devez également définir l’argument stage_location à l’emplacement de la zone de préparation où le fichier Python pour l’UDF et ses dépendances sont chargées.

Voici un exemple de la manière d’enregistrer une UDF temporaire nommée :

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Voici un exemple de la façon d’enregistrer une UDF permanente nommée en définissant l’argument is_permanent sur True :

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

Voici un exemple d’appel de ces UDFs :

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

Vous pouvez également appeler l’UDF en utilisant SQL :

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Création d’une UDF à partir d’un fichier source Python

Si vous créez votre UDF dans votre environnement de développement local, vous pouvez également définir votre gestionnaire d’UDF dans un fichier Python, puis utiliser la méthode register_from_file de la classe UDFRegistration pour créer une UDF.

Note

Vous ne pouvez pas utiliser cette méthode dans une feuille de calcul Python.

Voici des exemples d’utilisation de register_from_file.

Supposons que vous ayez un fichier Python test_udf_file.py qui contient :

def mod5(x: int) -> int:
    return x % 5
Copy

Ensuite, vous pouvez créer une UDF à partir de cette fonction du fichier test_udf_file.py.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Vous pouvez également charger le fichier vers un emplacement de zone de préparation, puis l’utiliser pour créer l’UDF.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Lecture de fichiers à partir d’une UDF

Pour lire le contenu d’un fichier, votre code Python peut :

Lecture de fichiers spécifiés de façon statique

La bibliothèque Snowpark charge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.

Note

Si vous écrivez votre UDF dans une feuille de calcul Python, l’UDF ne peut lire que les fichiers d’une zone de préparation.

Pour configurer une UDF pour lire un fichier :

  1. Spécifiez que le fichier est une dépendance, ce qui charge le fichier sur le serveur. Pour plus d’informations, voir Spécifier des dépendances pour une UDF.

    Par exemple :

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. Dans l’UDF, lisez le fichier. Dans l’exemple suivant, le fichier ne sera lu qu’une fois lors de la création de l’UDF, et ne sera pas relu lors de l’exécution de l’UDF. Ceci est réalisé grâce à une bibliothèque tierce cachetools.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

Lire des fichiers spécifiés de façon dynamique avec SnowflakeFile

Vous pouvez lire un fichier depuis une zone de préparation en utilisant la classe SnowflakeFile dans le module Snowpark snowflake.snowpark.files. La classe SnowflakeFile offre un accès dynamique aux fichiers, ce qui vous permet de diffuser des fichiers de n’importe quelle taille. L’accès dynamique aux fichiers est également utile lorsque vous souhaitez itérer sur plusieurs fichiers. Par exemple, voir Traitement de plusieurs fichiers.

Pour plus d’informations et d’exemples sur la lecture de fichiers à l’aide de SnowflakeFile, voir Lecture d’un fichier à l’aide de la classe SnowflakeFile dans un gestionnaire d’UDF Python.

L’exemple suivant enregistre une UDF temporaire qui lit un fichier texte à partir d’une zone de préparation utilisant SnowflakeFile et renvoie la longueur du fichier.

Enregistrer l’UDF :

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Appelez l’UDF :

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Utilisation des UDFs vectorisées

Les UDFs Python vectorisées vous permettent de définir des fonctions Python qui reçoivent des lots de lignes d’entrée sous forme de DataFrames Pandas et renvoient des lots de résultats sous forme de tableaux ou de séries Pandas. La colonne dans le dataframe Snowpark sera vectorisée comme une série Pandas à l’intérieur de l’UDF.

Voici un exemple d’utilisation de l’interface lots :

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

Vous appelez des UDFs Python vectorisées de la même manière que vous appelez les autres UDFs Python. Pour plus d’informations, consultez UDFs Python vectorisées, qui explique comment créer une UDF vectorielle à l’aide d’une instruction SQL. Par exemple, vous pouvez utiliser le décorateur vectorized lorsque vous spécifiez le code Python dans l’instruction SQL. En utilisant l’API Snowpark Python décrite dans ce document, vous n’utilisez pas une instruction SQL pour créer une UDF vectorielle. On n’utilise donc pas le décorateur vectorized.

Il est possible de limiter le nombre de lignes par lot. Pour plus d’informations, voir Définir une taille de lot cible.

Pour plus d’explications et d’exemples sur l’utilisation de l’API Snowpark Python pour créer des UDFs vectorielles, reportez-vous à la section sur les UDFs de la référence API de Snowpark.