Création de procédures stockées pour DataFrames en Python¶
L’API Snowpark fournit des méthodes que vous pouvez utiliser pour créer une procédure stockée en Python. Cette rubrique explique comment créer des procédures stockées.
Dans ce chapitre :
Introduction¶
Avec Snowpark, vous pouvez créer des procédures stockées pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces procédures stockées pour traiter les données dans votre DataFrame.
Vous pouvez créer des procédures stockées qui n’existent que dans la session actuelle (procédures stockées temporaires) ainsi que des procédures stockées que vous pouvez utiliser dans d’autres sessions (procédures stockées permanentes).
Utilisation de paquets tiers d’Anaconda dans une procédure stockée¶
Vous pouvez spécifier les paquets Anaconda à installer lorsque vous créez des procédures stockées Python. Lorsque vous appelez la procédure stockée Python à l’intérieur d’un entrepôt Snowflake, les paquets Anaconda sont installés de manière transparente et mis en cache sur l’entrepôt virtuel en votre nom. Pour plus d’informations sur les meilleures pratiques, sur la façon de visualiser les paquets disponibles et sur la façon de configurer un environnement de développement local, voir Utilisation de paquets tiers.
Utilisez session.add_packages
pour ajouter des paquets au niveau de la session.
Cet exemple de code montre comment importer des paquets et renvoyer leurs versions.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Vous pouvez également utiliser session.add_requirements
pour spécifier des paquets avec un fichier d’exigences.
>>> session.add_requirements("mydir/requirements.txt")
Vous pouvez ajouter les paquets de niveau procédure stockée pour remplacer les packages de niveau session que vous avez pu ajouter précédemment.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Important
Si vous ne spécifiez pas de version de paquet, Snowflake utilisera la dernière version lors de la résolution des dépendances. Toutefois, lorsque vous déployez la procédure stockée en production, vous pouvez vouloir vous assurer que votre code utilise toujours les mêmes versions de dépendances. Vous pouvez le faire pour les procédures stockées permanentes et temporaires.
Lorsque vous créez une procédure stockée permanente, la procédure stockée est créée et enregistrée une seule fois. Cela résout les dépendances une fois et la version sélectionnée est utilisée pour les charges de travail de production. Lorsque la procédure stockée s’exécute, elle utilise toujours les mêmes versions de dépendances.
Lorsque vous créez une procédure stockée temporaire, spécifiez les versions des dépendances dans le cadre de la spécification de la version. De cette façon, lorsque la procédure stockée est enregistrée, la résolution du paquet utilisera la version spécifiée. Si vous ne spécifiez pas la version, la dépendance peut être mise à jour lorsqu’une nouvelle version est disponible.
Création d’une procédure stockée anonyme¶
Pour créer une procédure stockée anonyme, vous pouvez soit :
Appeler la fonction
sproc
dans le modulesnowflake.snowpark.functions
et transmettre la définition de la fonction anonyme.Appeler la méthode
register
dans la classeStoredProcedureRegistration
, et transmettre la définition de la fonction anonyme. Pour accéder à un attribut ou à une méthode de la classeStoredProcedureRegistration
appelez la propriétésproc
de la classeSession
.
Voici un exemple de procédure stockée anonyme :
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Note
Lorsque vous écrivez du code susceptible de s’exécuter dans plusieurs sessions, utilisez la méthode register
pour enregistrer les procédures stockées, plutôt que la fonction sproc
. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session
par défaut ne peut être trouvé.
Création et enregistrement d’une procédure stockée nommée¶
Si vous souhaitez appeler une procédure stockée par son nom (par exemple, en utilisant la fonction call
dans l’objet Session
), vous pouvez créer et enregistrer une procédure stockée nommée. Pour ce faire, vous pouvez soit :
Appeler la fonction
sproc
dans le modulesnowflake.snowpark.functions
et transmettre l’argumentname
et la définition de la fonction anonyme.Appeler la méthode
register
dans la classeStoredProcedureRegistration
et transmettre l’argumentname
et la définition de la fonction anonyme. Pour accéder à un attribut ou à une méthode de la classeStoredProcedureRegistration
appelez la propriétésproc
de la classeSession
.
L’appel de sproc
ou de register
crée une procédure stockée temporaire que vous pouvez utiliser dans la session en cours.
Pour créer une procédure stockée permanente, appelez la méthode register
ou la fonction sproc
et définissez l’argument is_permanent
sur True
. Lorsque vous créez une procédure stockée permanente, vous devez également définir l’argument stage_location
à l’emplacement de la zone de préparation où le connecteur Python utilisé par Snowpark charge le fichier Python pour la procédure stockée et ses dépendances.
Voici un exemple de la manière d’enregistrer une procédure stockée temporaire nommée :
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Voici un exemple de la façon d’enregistrer une procédure stockée permanente nommée en définissant l’argument is_permanent
sur True
:
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
Voici un exemple d’appel de ces procédures stockées :
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Lecture de fichiers à l’aide d’une procédure stockée¶
Pour lire le contenu d’un fichier à l’aide d’une procédure stockée, vous pouvez :
Lire un fichier spécifié de façon statique en important un fichier et en le lisant ensuite dans le répertoire personnel de la procédure stockée.
Lire un fichier spécifié de façon dynamique avec SnowflakeFile. Vous pouvez le faire si vous avez besoin d’accéder à un fichier pendant un calcul.
Lecture de fichiers spécifiés de façon statique¶
Spécifiez que le fichier est une dépendance, ce qui charge le fichier sur le serveur. On procède de la même manière que pour les UDFs. Pour plus d’informations, voir Spécifier des dépendances pour une UDF.
Par exemple :
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
Dans la procédure stockée, lisez le fichier.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
Lire des fichiers spécifiés de façon dynamique avec SnowflakeFile
¶
Vous pouvez lire un fichier depuis une zone de préparation en utilisant la classe SnowflakeFile
dans le module Snowpark snowflake.snowpark.files
. La classe SnowflakeFile
offre un accès dynamique aux fichiers, ce qui vous permet de diffuser des fichiers de n’importe quelle taille. L’accès dynamique aux fichiers est également utile lorsque vous souhaitez itérer sur plusieurs fichiers. Par exemple, voir Traitement de plusieurs fichiers.
Pour plus d’informations et d’exemples sur la lecture de fichiers à l’aide de SnowflakeFile
, voir Lecture d’un fichier à l’aide de la classe SnowflakeFile dans un gestionnaire d’UDF Python.
L’exemple suivant crée une procédure stockée permanente qui lit un fichier à partir d’une zone de préparation à l’aide de SnowflakeFile
et renvoie la longueur du fichier.
Créez la procédure stockée :
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Appelez la procédure stockée :
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")