Écriture de procédures stockées en Python

Cette rubrique explique comment écrire une procédure stockée en Python. Vous pouvez utiliser la bibliothèque Snowpark dans votre procédure stockée pour effectuer des requêtes, des mises à jour et d’autres travaux sur les tables dans Snowflake.

Dans ce chapitre :

Introduction

Avec les procédures stockées de Snowpark, vous pouvez construire et exécuter votre pipeline de données dans Snowflake, en utilisant un entrepôt Snowflake comme cadre de calcul. Créez votre pipeline de données en utilisant l’API Snowpark pour Python pour écrire des procédures stockées. Pour planifier l’exécution de ces procédures stockées, vous utilisez des tâches.

Pour des informations sur les modèles de machine learning et sur Snowpark Python, voir Formation de modèles de machine learning avec Snowpark Python.

Vous pouvez écrire des procédures stockées Snowpark pour Python à l’aide d’une feuille de calcul Python ou à l’aide d’un environnement de développement local.

Vous pouvez capturer des données d’enregistrement et de trace pendant l’exécution du code de votre gestionnaire. Pour plus d’informations, reportez-vous à Vue d’ensemble de la journalisation et du traçage.

Note

Pour créer et appeler une procédure anonyme, utilisez CALL (avec procédure anonyme). La création et l’appel d’une procédure anonyme ne nécessitent pas un rôle avec des privilèges de schéma CREATE PROCEDURE.

Prérequis pour l’écriture locale de procédures stockées

Pour écrire des procédures stockées Python dans votre environnement de développement local, remplissez les conditions préalables suivantes :

  • Vous devez utiliser la version 0.4.0 ou une version plus récente de la bibliothèque Snowpark.

  • Activez les packages Anaconda pour que Snowpark Python puisse charger les dépendances tierces requises. Reportez-vous à Utilisation de paquets tiers à partir d’Anaconda.

  • Les versions de Python prises en charge sont les suivantes :

    • 3,8

    • 3,9

    • 3,10

    • 3,11

Configurez votre environnement de développement pour utiliser la bibliothèque Snowpark. Reportez-vous à Configuration de votre environnement de développement pour Snowpark.

Écriture du code Python pour la procédure stockée

Pour la logique de la procédure, écrivez un code de gestionnaire qui s’exécute lorsque la procédure est appelée. Cette section décrit la conception d’un gestionnaire.

Vous pouvez créer une procédure stockée à partir du code du gestionnaire de plusieurs manières :

Limites

Les procédures stockées de Snowpark ont les limitations suivantes :

  • La création de processus n’est pas prise en charge dans les procédures stockées.

  • L’exécution de requêtes simultanées n’est pas prise en charge dans les procédures stockées.

  • Vous ne pouvez pas utiliser d’APIs qui exécutent des commandes PUT et GET (y compris Session.sql("PUT ...") et Session.sql("GET ...")).

  • Lorsque vous téléchargez des fichiers à partir d’une zone de préparation en utilisant session.file.get, la correspondance de modèles n’est pas prise en charge.

  • Si vous exécutez votre procédure stockée à partir d’une tâche, vous devez spécifier un entrepôt lors de la création de la tâche. Vous ne pouvez pas utiliser les ressources de calcul gérées par Snowflake pour exécuter la tâche.

  • La création d’objets temp nommés n’est pas prise en charge dans une procédure stockée relative aux droits du propriétaire. Une procédure stockée avec les droits du propriétaire est une procédure stockée qui s’exécute avec les privilèges du propriétaire de la procédure stockée. Pour plus d’informations, voir les droits de l’appelant ou les droits du propriétaire.

Planification de l’écriture de votre procédure stockée

Les procédures stockées s’exécutent dans Snowflake, et vous devez donc planifier le code que vous écrivez en gardant cela à l’esprit.

  • Limitez la quantité de mémoire consommée. Snowflake impose des limites à une méthode ou une fonction en matière de quantité de mémoire nécessaire. Pour obtenir des conseils, reportez-vous à Concevoir des gestionnaires qui respectent les contraintes imposées par Snowflake.

  • Veillez à ce que la méthode ou la fonction de votre gestionnaire respecte le niveau de « thread safety ».

  • Suivez les règles et les restrictions de sécurité. Reportez-vous à Pratiques de sécurité pour UDFs et procédures.

  • Déterminez si vous souhaitez que la procédure stockée s’exécute avec les droits de l’appelant ou les droits du propriétaire.

  • Envisagez la version snowflake-snowpark-python utilisée pour exécuter les procédures stockées. En raison de limitations dans le processus de version des procédures stockées, la bibliothèque snowflake-snowpark-python disponible dans l’environnement Python des procédures stockées est généralement en retard d’une version par rapport à la version publique. Utilisez le code SQL suivant pour connaître la dernière version disponible :

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

Écriture de la méthode ou de la fonction

Lorsque vous écrivez la méthode ou la fonction pour la procédure stockée, notez ce qui suit :

  • Spécifiez l’objet Snowpark Session comme premier argument de votre méthode ou fonction. Lorsque vous appelez votre procédure stockée, Snowflake crée automatiquement un objet Session et le transmet à votre procédure stockée. (Vous ne pouvez pas créer l’objet Session vous-même).

  • Pour le reste des arguments et pour la valeur de retour, utilisez les types Python qui correspondent aux types de données Snowflake. Snowflake prend en charge les types de données Python énumérés dans Mappages de type de données SQL-Python pour les paramètres et les types de retour

Gestion des erreurs

Vous pouvez utiliser les techniques normales de gestion des exceptions de Python pour détecter les erreurs au sein de la procédure.

Si une exception non détectée se produit à l’intérieur de la méthode, Snowflake génère une erreur qui inclut la trace de la pile pour l’exception. Lorsque l’enregistrement des exceptions non gérées est activé, Snowflake enregistre les données relatives aux exceptions non gérées dans une table d’événements.

Mettre les dépendances à la disposition de votre code

Si le code de votre gestionnaire dépend d’un code défini en dehors du gestionnaire lui-même (tel que le code défini dans un module) ou de fichiers de ressources, vous pouvez mettre ces dépendances à la disposition de votre code en les téléchargeant dans une zone de préparation. Reportez-vous à Mettre les dépendances à la disposition de votre code, ou pour les feuilles de calcul Python, reportez-vous à Ajouter un fichier Python d’une zone de préparation à une feuille de calcul.

Si vous créez votre procédure stockée à l’aide de SQL, utilisez la clause IMPORTS lors de l’écriture de l’instruction CREATE PROCEDURE, pour pointer vers les fichiers de dépendance.

Accès aux données dans Snowflake à partir de votre procédure stockée

Pour accéder aux données dans Snowflake, utilisez les APIs de bibliothèque Snowpark.

Lors du traitement d’un appel à votre procédure stockée Python, Snowflake crée un objet Session Snowpark et transmet cet objet à la méthode ou à la fonction de votre procédure stockée.

Comme c’est le cas avec les procédures stockées dans d’autres langages, le contexte de la session (par exemple, les privilèges, la base de données et le schéma actuels, etc.) est déterminé par le fait que la procédure stockée s’exécute avec les droits de l’appelant ou les droits du propriétaire. Pour plus de détails, voir Accessing and Setting the Session State.

Vous pouvez utiliser cet objet Session pour appeler des APIs dans la bibliothèque Snowpark. Par exemple, vous pouvez créer un DataFrame pour une table ou exécuter une instruction SQL.

Voir le Guide du développeur Snowpark pour plus d’informations.

Exemple d’accès aux données

Voici un exemple de méthode Python qui copie un nombre donné de lignes d’une table vers une autre table. La méthode accepte les arguments suivants :

  • Un objet Session Snowpark

  • Le nom de la table à partir de laquelle les lignes doivent être copiées

  • Le nom de la table dans laquelle les lignes doivent être enregistrées

  • Le nombre de lignes à copier

La méthode dans cet exemple renvoie une chaîne. Si vous exécutez cet exemple dans une feuille de calcul Python, remplacez le type de retour pour la feuille de calcul par une String

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

Lecture de fichiers

Vous pouvez lire dynamiquement un fichier en zone de préparation dans votre gestionnaire Python en utilisant la classe SnowflakeFile du module Snowpark snowflake.snowpark.files.

Snowflake prend en charge la lecture de fichiers avec SnowflakeFile tant pour les procédures stockées que pour les fonctions définies par l’utilisateur. Pour plus d’informations sur la lecture de fichiers dans le code de votre gestionnaire, ainsi que d’autres exemples, reportez-vous à Lecture d’un fichier avec un gestionnaire d’UDF Python.

Cet exemple montre comment créer et appeler une procédure stockée sur les droits du propriétaire qui lit un fichier à l’aide de la classe SnowflakeFile.

Créez la procédure stockée avec un gestionnaire en ligne, en spécifiant le mode d’entrée comme étant binaire en transmettant rb pour l’argument mode :

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

Appelez la procédure stockée :

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

Utilisation de paquets tiers à partir d’Anaconda

Vous pouvez spécifier les paquets Anaconda à installer lorsque vous créez des procédures stockées Python. Pour voir la liste des paquets tiers d’Anaconda, consultez le canal Anaconda Snowflake. Ces paquets tiers sont construits et fournis par Anaconda. Vous pouvez utiliser le canal Anaconda de Snowflake pour des tests et du développement locaux, sans frais, conformément aux conditions supplémentaires relatives aux logiciels embarqués des conditions de service d’Anaconda.

Pour connaître ces limitations, voir Limites.

Prise en main

Avant de commencer à utiliser les packages fournis par Anaconda dans Snowflake, vous devez accepter les Conditions de tiers de Snowflake.

Note

Vous devez être l’administrateur de l’organisation (utilisez le rôle ORGADMIN) pour accepter les conditions. Vous ne devez accepter les conditions qu’une seule fois pour votre compte Snowflake. Voir Activation du rôle ORGADMIN dans un compte.

  1. Connectez-vous à Snowsight.

  2. Sélectionnez Admin » Billing & Terms.

  3. Dans la section Anaconda, sélectionnez Enable.

  4. Dans la boîte de dialogue Anaconda Packages , cliquez sur le lien pour consulter la page des conditions de tiers de Snowflake.

  5. Si vous acceptez les conditions, sélectionnez Acknowledge & Continue.

Si une erreur apparaît lorsque vous essayez d’accepter les conditions d’utilisation, il se peut qu’il manque un prénom, un nom ou une adresse e-mail dans votre profil d’utilisateur. Si vous avez un rôle d’administrateur, reportez-vous aux Ajouter des détails sur l’utilisateur à votre profil d’utilisateur pour mettre à jour votre profil à l’aide de Snowsight. Sinon, contactez un administrateur pour mettre à jour votre compte.

Note

Si vous n’acceptez pas les conditions de tiers de Snowflake comme décrit ci-dessus, vous pouvez toujours utiliser les procédures stockées, mais avec ces limitations :

  • Vous ne pouvez pas utiliser de paquets tiers à partir d’Anaconda.

  • Vous pouvez toujours spécifier Snowpark Python en tant que paquet dans une procédure stockée, mais vous ne pouvez pas spécifier une version spécifique.

  • Vous ne pouvez pas utiliser la méthode to_pandas lorsque vous interagissez avec un objet DataFrame.

Affichage et utilisation des paquets

Vous pouvez afficher tous les packages disponibles et leurs informations de version en interrogeant la vue PACKAGES dans Information Schema :

select * from information_schema.packages where language = 'python';
Copy

Pour plus d’informations, voir Utilisation de paquets de tiers dans la documentation de Snowflake sur les UDF Python.

Création de la procédure stockée

Vous pouvez créer une procédure stockée à partir d’une feuille de calcul Python ou à l’aide de SQL.

Création d’une procédure stockée Python pour automatiser le code de votre feuille de calcul Python

Créez une procédure stockée Python à partir de votre feuille de calcul Python pour automatiser votre code. Pour plus de détails sur l’écriture de feuilles de calcul Python, consultez Écriture de code Snowpark dans des feuilles de calcul Python.

Conditions préalables

Votre rôle doit disposer des privilèges OWNERSHIP ou CREATE PROCEDURE sur le schéma de base de données dans lequel vous exécutez votre feuille de calcul Python pour la déployer en tant que procédure stockée.

Déployer une feuille de calcul Python en tant que procédure stockée

Pour créer une procédure stockée Python afin d’automatiser le code dans votre feuille de calcul Python, procédez comme suit :

  1. Connectez-vous à Snowsight.

  2. Ouvrez Worksheets.

  3. Ouvrez la feuille de calcul Python que vous souhaitez déployer en tant que procédure stockée.

  4. Sélectionnez Deploy.

  5. Saisissez un nom pour la procédure stockée.

  6. (Facultatif) Entrez un commentaire avec des détails sur la procédure stockée.

  7. (Facultatif) Sélectionnez Replace if exists pour remplacer une procédure stockée existante par le même nom.

  8. Pour Handler, sélectionnez la fonction de traitement de votre procédure stockée. Par exemple, main.

  9. Examinez les arguments utilisés par votre fonction de gestionnaire et, si nécessaire, remplacez le mappage du type de données SQL pour un argument typé. Pour plus de détails sur la façon dont les types Python sont mappés avec les types SQL, consultez Mappages des types de données SQL-Python.

  10. (Facultatif) Sélectionnez Open in Worksheets pour ouvrir la définition de la procédure stockée dans une feuille de calcul SQL.

  11. Sélectionnez Deploy pour créer la procédure stockée.

  12. Une fois la procédure stockée créée, vous pouvez accéder aux détails de la procédure ou sélectionner Done.

Vous pouvez créer plusieurs procédures stockées à partir d’une feuille de calcul Python.

Après avoir créé une procédure stockée, vous pouvez l’automatiser dans le cadre d’une tâche. Reportez-vous à Exécution d’instructions SQL sur une planification à l’aide de tâches.

Renvoi de données tabulaires

Vous pouvez écrire une procédure qui renvoie des données sous forme de tableau. Pour écrire une procédure qui renvoie des données tabulaires, procédez comme suit :

  • Spécifiez TABLE(...) comme type de retour de la procédure dans votre instruction CREATE PROCEDURE.

    En tant que paramètres TABLE, vous pouvez spécifier les noms de colonne des données renvoyées et les types si vous les connaissez. Si vous ne connaissez pas les colonnes renvoyées lors de la définition de la procédure, par exemple lorsqu’elles sont spécifiées au moment de l’exécution, vous pouvez omettre les paramètres TABLE. Lorsque vous le faites, les colonnes de valeur de retour de la procédure seront converties à partir des colonnes du DataFrame renvoyées par son gestionnaire. Les types de données de colonne seront convertis en SQL selon le mappage spécifié dans Mappages des types de données SQL-Python.

  • Écrivez le gestionnaire afin qu’il renvoie le résultat tabulaire dans un DataFrame Snowpark.

    Pour plus d’informations sur les dataframes, voir Utilisation de DataFrames dans Snowpark Python.

Exemple

Les exemples de cette section illustrent le retour de valeurs tabulaires à partir d’une procédure qui filtre les lignes où une colonne correspond à une chaîne.

Définition des données

Le code de l’exemple suivant crée une table d’employés.

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

Spécification des noms et des types de colonne de retour

Cet exemple spécifie les noms et les types de colonne dans l’instruction RETURNS TABLE().

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

Omission des noms et des types de colonne de retour

Le code dans l’exemple suivant déclare une procédure qui permet d’extrapoler les noms et les types de colonne de valeur de retour à partir des colonnes de la valeur de retour du gestionnaire. Il omet les noms et les types de colonne de l’instruction RETURNS TABLE().

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

Appel de la procédure

L’exemple suivant appelle la procédure stockée :

CALL filterByRole('employees', 'dev');
Copy

L’appel de procédure produit la sortie suivante :

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

Utilisation des procédures stockées

Après avoir créé une procédure stockée, vous pouvez l’appeler depuis du code SQL ou dans le cadre d’une tâche planifiée.

Exemples

Exécution de tâches simultanées à l’aide de processus de travail

Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.

Note

Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.

Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.

Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel de la bibliothèque joblib comme dans l’exemple suivant.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

Note

Le backend par défaut utilisé pour joblib.Parallel diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.

  • Valeur par défaut de l’entrepôt standard : threading

  • Valeur par défaut de l’entrepôt optimisé pour Snowpark : loky (multitraitement)

Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend comme dans l’exemple suivant.

import joblib
joblib.parallel_backend('loky')
Copy