Création de procédures stockées pour DataFrames en Python

L’API Snowpark fournit des méthodes que vous pouvez utiliser pour créer une procédure stockée en Python. Cette rubrique explique comment créer des procédures stockées.

Dans ce chapitre :

Introduction

Avec Snowpark, vous pouvez créer des procédures stockées pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces procédures stockées pour traiter les données dans votre DataFrame.

Vous pouvez créer des procédures stockées qui n’existent que dans la session actuelle (procédures stockées temporaires) ainsi que des procédures stockées que vous pouvez utiliser dans d’autres sessions (procédures stockées permanentes).

Utilisation de paquets tiers d’Anaconda dans une procédure stockée

Vous pouvez spécifier les paquets Anaconda à installer lorsque vous créez des procédures stockées Python. Lorsque vous appelez la procédure stockée Python à l’intérieur d’un entrepôt Snowflake, les paquets Anaconda sont installés de manière transparente et mis en cache sur l’entrepôt virtuel en votre nom. Pour plus d’informations sur les meilleures pratiques, sur la façon de visualiser les paquets disponibles et sur la façon de configurer un environnement de développement local, voir Utilisation de paquets tiers.

Utilisez session.add_packages pour ajouter des paquets au niveau de la session.

Cet exemple de code montre comment importer des paquets et renvoyer leurs versions.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")

>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
...   return [pd.__version__, xgb.__version__]
Copy

Vous pouvez également utiliser session.add_requirements pour spécifier des paquets avec un fichier d’exigences.

>>> session.add_requirements("mydir/requirements.txt")
Copy

Vous pouvez ajouter les paquets de niveau procédure stockée pour remplacer les packages de niveau session que vous avez pu ajouter précédemment.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
...    return [pd.__version__, xgb.__version__]
Copy

Important

Si vous ne spécifiez pas de version de paquet, Snowflake utilisera la dernière version lors de la résolution des dépendances. Toutefois, lorsque vous déployez la procédure stockée en production, vous pouvez vouloir vous assurer que votre code utilise toujours les mêmes versions de dépendances. Vous pouvez le faire pour les procédures stockées permanentes et temporaires.

  • Lorsque vous créez une procédure stockée permanente, la procédure stockée est créée et enregistrée une seule fois. Cela résout les dépendances une fois et la version sélectionnée est utilisée pour les charges de travail de production. Lorsque la procédure stockée s’exécute, elle utilise toujours les mêmes versions de dépendances.

  • Lorsque vous créez une procédure stockée temporaire, spécifiez les versions des dépendances dans le cadre de la spécification de la version. De cette façon, lorsque la procédure stockée est enregistrée, la résolution du paquet utilisera la version spécifiée. Si vous ne spécifiez pas la version, la dépendance peut être mise à jour lorsqu’une nouvelle version est disponible.

Création d’une procédure stockée anonyme

Pour créer une procédure stockée anonyme, vous pouvez soit :

  • Appeler la fonction sproc dans le module snowflake.snowpark.functions et transmettre la définition de la fonction anonyme.

  • Appeler la méthode register dans la classe StoredProcedureRegistration, et transmettre la définition de la fonction anonyme. Pour accéder à un attribut ou à une méthode de la classe StoredProcedureRegistration appelez la propriété sproc de la classe Session.

Voici un exemple de procédure stockée anonyme :

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Copy

Note

Lorsque vous écrivez du code susceptible de s’exécuter dans plusieurs sessions, utilisez la méthode register pour enregistrer les procédures stockées, plutôt que la fonction sproc. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session par défaut ne peut être trouvé.

Création et enregistrement d’une procédure stockée nommée

Si vous souhaitez appeler une procédure stockée par son nom (par exemple, en utilisant la fonction call dans l’objet Session), vous pouvez créer et enregistrer une procédure stockée nommée. Pour ce faire, vous pouvez soit :

  • Appeler la fonction sproc dans le module snowflake.snowpark.functions et transmettre l’argument name et la définition de la fonction anonyme.

  • Appeler la méthode register dans la classe StoredProcedureRegistration et transmettre l’argument name et la définition de la fonction anonyme. Pour accéder à un attribut ou à une méthode de la classe StoredProcedureRegistration appelez la propriété sproc de la classe Session.

L’appel de sproc ou de register crée une procédure stockée temporaire que vous pouvez utiliser dans la session en cours.

Pour créer une procédure stockée permanente, appelez la méthode register ou la fonction sproc et définissez l’argument is_permanent sur True. Lorsque vous créez une procédure stockée permanente, vous devez également définir l’argument stage_location à l’emplacement de la zone de préparation où le connecteur Python utilisé par Snowpark charge le fichier Python pour la procédure stockée et ses dépendances.

Voici un exemple de la manière d’enregistrer une procédure stockée temporaire nommée :

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Copy

Voici un exemple de la façon d’enregistrer une procédure stockée permanente nommée en définissant l’argument is_permanent sur True :

>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
...  return session.sql(f"select {x} - 1").collect()[0][0]
Copy

Voici un exemple d’appel de ces procédures stockées :

>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Lecture de fichiers à l’aide d’une procédure stockée

Pour lire le contenu d’un fichier à l’aide d’une procédure stockée, vous pouvez :

Lecture de fichiers spécifiés de façon statique

  1. Spécifiez que le fichier est une dépendance, ce qui charge le fichier sur le serveur. On procède de la même manière que pour les UDFs. Pour plus d’informations, voir Spécifier des dépendances pour une UDF.

    Par exemple :

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. Dans la procédure stockée, lisez le fichier.

    >>> def read_file(name: str) -> str:
    ...    import sys
    ...    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    ...    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    ...
    ...    with open(import_dir + 'my_file.txt', 'r') as file:
    ...        return file.read()
    
    Copy

Lire des fichiers spécifiés de façon dynamique avec SnowflakeFile

Vous pouvez lire un fichier depuis une zone de préparation en utilisant la classe SnowflakeFile dans le module Snowpark snowflake.snowpark.files. La classe SnowflakeFile offre un accès dynamique aux fichiers, ce qui vous permet de diffuser des fichiers de n’importe quelle taille. L’accès dynamique aux fichiers est également utile lorsque vous souhaitez itérer sur plusieurs fichiers. Par exemple, voir Traitement de plusieurs fichiers.

Pour plus d’informations et d’exemples sur la lecture de fichiers à l’aide de SnowflakeFile, voir Lecture d’un fichier à l’aide de la classe SnowflakeFile dans un gestionnaire d’UDF Python.

L’exemple suivant crée une procédure stockée permanente qui lit un fichier à partir d’une zone de préparation à l’aide de SnowflakeFile et renvoie la longueur du fichier.

Créez la procédure stockée :

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Appelez la procédure stockée :

file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")
Copy