Écriture de procédures stockées en Python¶
Cette rubrique explique comment écrire une procédure stockée en Python. Vous pouvez utiliser la bibliothèque Snowpark dans votre procédure stockée pour effectuer des requêtes, des mises à jour et d’autres travaux sur les tables dans Snowflake.
Dans ce chapitre :
Introduction¶
Avec les procédures stockées de Snowpark, vous pouvez construire et exécuter votre pipeline de données dans Snowflake, en utilisant un entrepôt Snowflake comme cadre de calcul. Créez votre pipeline de données en utilisant l’API Snowpark pour Python pour écrire des procédures stockées. Pour planifier l’exécution de ces procédures stockées, vous utilisez des tâches.
Pour des informations sur les modèles de machine learning et sur Snowpark Python, voir Formation de modèles de machine learning avec Snowpark Python.
Vous pouvez écrire des procédures stockées Snowpark pour Python à l’aide d’une feuille de calcul Python ou à l’aide d’un environnement de développement local.
Vous pouvez capturer des données d’enregistrement et de trace pendant l’exécution du code de votre gestionnaire. Pour plus d’informations, reportez-vous à Vue d’ensemble de la journalisation et du traçage.
Note
Pour créer et appeler une procédure anonyme, utilisez CALL (avec procédure anonyme). La création et l’appel d’une procédure anonyme ne nécessitent pas un rôle avec des privilèges de schéma CREATE PROCEDURE.
Prérequis pour l’écriture locale de procédures stockées¶
Pour écrire des procédures stockées Python dans votre environnement de développement local, remplissez les conditions préalables suivantes :
Vous devez utiliser la version 0.4.0 ou une version plus récente de la bibliothèque Snowpark.
Activez les packages Anaconda pour que Snowpark Python puisse charger les dépendances tierces requises. Reportez-vous à Utilisation de paquets tiers à partir d’Anaconda.
Les versions de Python prises en charge sont les suivantes :
3,8
3,9
3,10
3,11
Configurez votre environnement de développement pour utiliser la bibliothèque Snowpark. Reportez-vous à Configuration de votre environnement de développement pour Snowpark.
Écriture du code Python pour la procédure stockée¶
Pour la logique de la procédure, écrivez un code de gestionnaire qui s’exécute lorsque la procédure est appelée. Cette section décrit la conception d’un gestionnaire.
Vous pouvez créer une procédure stockée à partir du code du gestionnaire de plusieurs manières :
Incluez le code en ligne avec l’instruction SQL qui crée la procédure. Reportez-vous à Conserver le code du gestionnaire en ligne ou dans une zone de préparation.
Copiez le code dans une zone de préparation et faites-y référence lorsque vous créez la procédure. Reportez-vous à Conserver le code du gestionnaire en ligne ou dans une zone de préparation.
Écrivez le code dans une feuille de calcul Python et déployez le contenu de la feuille de calcul dans une procédure stockée. Reportez-vous à Création d’une procédure stockée Python pour automatiser le code de votre feuille de calcul Python.
Limitations¶
Les procédures stockées de Snowpark ont les limitations suivantes :
La création de processus n’est pas prise en charge dans les procédures stockées.
L’exécution de requêtes simultanées n’est pas prise en charge dans les procédures stockées.
Vous ne pouvez pas utiliser d’APIs qui exécutent des commandes PUT et GET (y compris
Session.sql("PUT ...")
etSession.sql("GET ...")
).Lorsque vous téléchargez des fichiers à partir d’une zone de préparation en utilisant
session.file.get
, la correspondance de modèles n’est pas prise en charge.Si vous exécutez votre procédure stockée à partir d’une tâche, vous devez spécifier un entrepôt lors de la création de la tâche. Vous ne pouvez pas utiliser les ressources de calcul sans serveur pour exécuter la tâche.
La création d’objets temp nommés n’est pas prise en charge dans une procédure stockée relative aux droits du propriétaire. Une procédure stockée avec les droits du propriétaire est une procédure stockée qui s’exécute avec les privilèges du propriétaire de la procédure stockée. Pour plus d’informations, voir les droits de l’appelant ou les droits du propriétaire.
Planification de l’écriture de votre procédure stockée¶
Les procédures stockées s’exécutent dans Snowflake, et vous devez donc planifier le code que vous écrivez en gardant cela à l’esprit.
Limitez la quantité de mémoire consommée. Snowflake impose des limites à une méthode ou une fonction en matière de quantité de mémoire nécessaire. Pour obtenir des conseils, reportez-vous à Concevoir des gestionnaires qui respectent les contraintes imposées par Snowflake.
Veillez à ce que la méthode ou la fonction de votre gestionnaire respecte le niveau de « thread safety ».
Suivez les règles et les restrictions de sécurité. Reportez-vous à Pratiques de sécurité pour UDFs et procédures.
Déterminez si vous souhaitez que la procédure stockée s’exécute avec les droits de l’appelant ou les droits du propriétaire.
Envisagez la version snowflake-snowpark-python utilisée pour exécuter les procédures stockées. En raison de limitations dans le processus de version des procédures stockées, la bibliothèque snowflake-snowpark-python disponible dans l’environnement Python des procédures stockées est généralement en retard d’une version par rapport à la version publique. Utilisez le code SQL suivant pour connaître la dernière version disponible :
select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
Écriture de la méthode ou de la fonction¶
Lorsque vous écrivez la méthode ou la fonction pour la procédure stockée, notez ce qui suit :
Spécifiez l’objet Snowpark
Session
comme premier argument de votre méthode ou fonction. Lorsque vous appelez votre procédure stockée, Snowflake crée automatiquement un objetSession
et le transmet à votre procédure stockée. (Vous ne pouvez pas créer l’objetSession
vous-même).Pour le reste des arguments et pour la valeur de retour, utilisez les types Python qui correspondent aux types de données Snowflake. Snowflake prend en charge les types de données Python énumérés dans Mappages de type de données SQL-Python pour les paramètres et les types de retour
Gestion des erreurs¶
Vous pouvez utiliser les techniques normales de gestion des exceptions de Python pour détecter les erreurs au sein de la procédure.
Si une exception non détectée se produit à l’intérieur de la méthode, Snowflake génère une erreur qui inclut la trace de la pile pour l’exception. Lorsque l’enregistrement des exceptions non gérées est activé, Snowflake enregistre les données relatives aux exceptions non gérées dans une table d’événements.
Mettre les dépendances à la disposition de votre code¶
Si le code de votre gestionnaire dépend d’un code défini en dehors du gestionnaire lui-même (tel que le code défini dans un module) ou de fichiers de ressources, vous pouvez mettre ces dépendances à la disposition de votre code en les téléchargeant dans une zone de préparation. Reportez-vous à Mettre les dépendances à la disposition de votre code, ou pour les feuilles de calcul Python, reportez-vous à Ajouter un fichier Python d’une zone de préparation à une feuille de calcul.
Si vous créez votre procédure stockée à l’aide de SQL, utilisez la clause IMPORTS lors de l’écriture de l’instruction CREATE PROCEDURE, pour pointer vers les fichiers de dépendance.
Accès aux données dans Snowflake à partir de votre procédure stockée¶
Pour accéder aux données dans Snowflake, utilisez les APIs de bibliothèque Snowpark.
Lors du traitement d’un appel à votre procédure stockée Python, Snowflake crée un objet Session
Snowpark et transmet cet objet à la méthode ou à la fonction de votre procédure stockée.
Comme c’est le cas avec les procédures stockées dans d’autres langages, le contexte de la session (par exemple, les privilèges, la base de données et le schéma actuels, etc.) est déterminé par le fait que la procédure stockée s’exécute avec les droits de l’appelant ou les droits du propriétaire. Pour plus de détails, voir Accessing and Setting the Session State.
Vous pouvez utiliser cet objet Session
pour appeler des APIs dans la bibliothèque Snowpark. Par exemple, vous pouvez créer un DataFrame pour une table ou exécuter une instruction SQL.
Voir le Guide du développeur Snowpark pour plus d’informations.
Exemple d’accès aux données¶
Voici un exemple de méthode Python qui copie un nombre donné de lignes d’une table vers une autre table. La méthode accepte les arguments suivants :
Un objet
Session
SnowparkLe nom de la table à partir de laquelle les lignes doivent être copiées
Le nom de la table dans laquelle les lignes doivent être enregistrées
Le nombre de lignes à copier
La méthode dans cet exemple renvoie une chaîne. Si vous exécutez cet exemple dans une feuille de calcul Python, remplacez le type de retour pour la feuille de calcul par une String
def run(session, from_table, to_table, count):
session.table(from_table).limit(count).write.save_as_table(to_table)
return "SUCCESS"
Lecture de fichiers¶
Vous pouvez lire dynamiquement un fichier en zone de préparation dans votre gestionnaire Python en utilisant la classe SnowflakeFile
du module Snowpark snowflake.snowpark.files
.
Snowflake prend en charge la lecture de fichiers avec SnowflakeFile
tant pour les procédures stockées que pour les fonctions définies par l’utilisateur. Pour plus d’informations sur la lecture de fichiers dans le code de votre gestionnaire, ainsi que d’autres exemples, reportez-vous à Lecture d’un fichier avec un gestionnaire d’UDF Python.
Cet exemple montre comment créer et appeler une procédure stockée sur les droits du propriétaire qui lit un fichier à l’aide de la classe SnowflakeFile
.
Créez la procédure stockée avec un gestionnaire en ligne, en spécifiant le mode d’entrée comme étant binaire en transmettant rb
pour l’argument mode
:
CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(ignored_session, file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
Appelez la procédure stockée :
CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Utilisation de paquets tiers à partir d’Anaconda¶
Vous pouvez spécifier les paquets Anaconda à installer lorsque vous créez des procédures stockées Python. Pour voir la liste des paquets tiers d’Anaconda, consultez le canal Anaconda Snowflake. Ces paquets tiers sont construits et fournis par Anaconda. Vous pouvez utiliser le canal Anaconda de Snowflake pour des tests et du développement locaux, sans frais, conformément aux conditions supplémentaires relatives aux logiciels embarqués des conditions de service d’Anaconda.
Pour connaître ces limitations, voir Limitations.
Prise en main¶
Avant de commencer à utiliser les paquets fournis par Anaconda dans Snowflake, vous devez accepter les conditions des offres externes.
Note
Vous devez être l’administrateur de l’organisation (utilisez le rôle ORGADMIN) pour accepter les conditions. Vous ne devez accepter les conditions qu’une seule fois pour votre compte Snowflake. Voir Activation du rôle ORGADMIN dans un compte.
Connectez-vous à Snowsight.
Sélectionnez Admin » Billing & Terms.
Dans la section Anaconda, sélectionnez Enable.
Dans la boîte de dialogue Anaconda Packages , cliquez sur le lien pour consulter la page des conditions des offres externes.
Si vous acceptez les conditions, sélectionnez Acknowledge & Continue.
Si une erreur apparaît lorsque vous essayez d’accepter les conditions d’utilisation, il se peut qu’il manque un prénom, un nom ou une adresse e-mail dans votre profil d’utilisateur. Si vous avez un rôle d’administrateur, reportez-vous aux Ajouter des détails sur l’utilisateur à votre profil d’utilisateur pour mettre à jour votre profil à l’aide de Snowsight. Sinon, contactez un administrateur pour mettre à jour votre compte.
Note
Si vous n’acceptez pas les conditions de tiers de Snowflake comme décrit ci-dessus, vous pouvez toujours utiliser les procédures stockées, mais avec ces limitations :
Vous ne pouvez pas utiliser de paquets tiers à partir d’Anaconda.
Vous pouvez toujours spécifier Snowpark Python en tant que paquet dans une procédure stockée, mais vous ne pouvez pas spécifier une version spécifique.
Vous ne pouvez pas utiliser la méthode
to_pandas
lorsque vous interagissez avec un objet DataFrame.
Affichage et utilisation des paquets¶
Vous pouvez afficher tous les packages disponibles et leurs informations de version en interrogeant la vue PACKAGES dans Information Schema :
select * from information_schema.packages where language = 'python';
Pour plus d’informations, voir Utilisation de paquets de tiers dans la documentation de Snowflake sur les UDF Python.
Création de la procédure stockée¶
Vous pouvez créer une procédure stockée à partir d’une feuille de calcul Python ou à l’aide de SQL.
Pour créer une procédure stockée avec SQL, consultez Création d’une procédure stockée.
Pour créer une procédure stockée à partir d’une feuille de calcul Python, consultez Création d’une procédure stockée Python pour automatiser le code de votre feuille de calcul Python.
Création d’une procédure stockée Python pour automatiser le code de votre feuille de calcul Python¶
Créez une procédure stockée Python à partir de votre feuille de calcul Python pour automatiser votre code. Pour plus de détails sur l’écriture de feuilles de calcul Python, consultez Écriture de code Snowpark dans des feuilles de calcul Python.
Conditions préalables¶
Votre rôle doit disposer des privilèges OWNERSHIP ou CREATE PROCEDURE sur le schéma de base de données dans lequel vous exécutez votre feuille de calcul Python pour la déployer en tant que procédure stockée.
Déployer une feuille de calcul Python en tant que procédure stockée¶
Pour créer une procédure stockée Python afin d’automatiser le code dans votre feuille de calcul Python, procédez comme suit :
Connectez-vous à Snowsight.
Ouvrez Projects » Worksheets.
Ouvrez la feuille de calcul Python que vous souhaitez déployer en tant que procédure stockée.
Sélectionnez Deploy.
Saisissez un nom pour la procédure stockée.
(Facultatif) Entrez un commentaire avec des détails sur la procédure stockée.
(Facultatif) Sélectionnez Replace if exists pour remplacer une procédure stockée existante par le même nom.
Pour Handler, sélectionnez la fonction de traitement de votre procédure stockée. Par exemple,
main
.Examinez les arguments utilisés par votre fonction de gestionnaire et, si nécessaire, remplacez le mappage du type de données SQL pour un argument typé. Pour plus de détails sur la façon dont les types Python sont mappés avec les types SQL, consultez Mappages des types de données SQL-Python.
(Facultatif) Sélectionnez Open in Worksheets pour ouvrir la définition de la procédure stockée dans une feuille de calcul SQL.
Sélectionnez Deploy pour créer la procédure stockée.
Une fois la procédure stockée créée, vous pouvez accéder aux détails de la procédure ou sélectionner Done.
Vous pouvez créer plusieurs procédures stockées à partir d’une feuille de calcul Python.
Après avoir créé une procédure stockée, vous pouvez l’automatiser dans le cadre d’une tâche. Reportez-vous à Exécution d’instructions SQL sur une planification à l’aide de tâches.
Renvoi de données tabulaires¶
Vous pouvez écrire une procédure qui renvoie des données sous forme de tableau. Pour écrire une procédure qui renvoie des données tabulaires, procédez comme suit :
Spécifiez
TABLE(...)
comme type de retour de la procédure dans votre instruction CREATE PROCEDURE.En tant que paramètres TABLE, vous pouvez spécifier les noms de colonne des données renvoyées et les types si vous les connaissez. Si vous ne connaissez pas les colonnes renvoyées lors de la définition de la procédure, par exemple lorsqu’elles sont spécifiées au moment de l’exécution, vous pouvez omettre les paramètres TABLE. Lorsque vous le faites, les colonnes de valeur de retour de la procédure seront converties à partir des colonnes du DataFrame renvoyées par son gestionnaire. Les types de données de colonne seront convertis en SQL selon le mappage spécifié dans Mappages des types de données SQL-Python.
Écrivez le gestionnaire afin qu’il renvoie le résultat tabulaire dans un DataFrame Snowpark.
Pour plus d’informations sur les dataframes, voir Utilisation de DataFrames dans Snowpark Python.
Exemple¶
Les exemples de cette section illustrent le retour de valeurs tabulaires à partir d’une procédure qui filtre les lignes où une colonne correspond à une chaîne.
Définition des données¶
Le code de l’exemple suivant crée une table d’employés.
CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Spécification des noms et des types de colonne de retour¶
Cet exemple spécifie les noms et les types de colonne dans l’instruction RETURNS TABLE()
.
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col
def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col("role") == role)
$$;
Omission des noms et des types de colonne de retour¶
Le code dans l’exemple suivant déclare une procédure qui permet d’extrapoler les noms et les types de colonne de valeur de retour à partir des colonnes de la valeur de retour du gestionnaire. Il omet les noms et les types de colonne de l’instruction RETURNS TABLE()
.
CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col
def filter_by_role(session, table_name, role):
df = session.table(table_name)
return df.filter(col("role") == role)
$$;
Appel de la procédure¶
L’exemple suivant appelle la procédure stockée :
CALL filterByRole('employees', 'dev');
L’appel de procédure produit la sortie suivante :
+----+-------+------+
| ID | NAME | ROLE |
+----+-------+------+
| 2 | Bob | dev |
| 3 | Cindy | dev |
+----+-------+------+
Utilisation des procédures stockées¶
Après avoir créé une procédure stockée, vous pouvez l’appeler depuis du code SQL ou dans le cadre d’une tâche planifiée.
Pour plus d’informations sur l’appel d’une procédure stockée depuis du code SQL, reportez-vous à Appel d’une procédure stockée.
Pour plus d’informations sur l’appel d’une procédure stockée dans le cadre d’une tâche planifiée, reportez-vous à Exécution d’instructions SQL sur une planification à l’aide de tâches.
Exemples¶
Exécution de tâches simultanées à l’aide de processus de travail¶
Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.
Note
Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.
Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.
Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel
de la bibliothèque joblib
comme dans l’exemple suivant.
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
Note
Le backend par défaut utilisé pour joblib.Parallel
diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.
Valeur par défaut de l’entrepôt standard :
threading
Valeur par défaut de l’entrepôt optimisé pour Snowpark :
loky
(multitraitement)
Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend
comme dans l’exemple suivant.
import joblib
joblib.parallel_backend('loky')