Exemples de gestionnaires Python pour les procédures stockées¶
Exécution de tâches simultanées à l’aide de processus de tâches worker¶
Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.
Note
Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.
Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.
Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel de la bibliothèque joblib comme dans l’exemple suivant.
Note
Le backend par défaut utilisé pour joblib.Parallel diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.
Valeur par défaut de l’entrepôt standard :
threadingValeur par défaut de l’entrepôt optimisé pour Snowpark :
loky(multitraitement)
Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend comme dans l’exemple suivant.
Utilisation d’APIs Snowpark pour le traitement asynchrone¶
Les exemples suivants illustrent la manière dont vous pouvez utiliser des APIs Snowpark pour lancer des tâches enfant asynchrones, ainsi que le comportement de ces tâches dans différentes conditions.
Vérification du statut d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure checkStatus exécute une tâche enfant asynchrone qui attend 60 secondes. La procédure vérifie ensuite le statut de la tâche avant qu’elle ne soit terminée, de sorte que la vérification renvoie False.
Le code suivant appelle la procédure.
Annulation d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure cancelJob utilise SQL pour insérer des données dans la table test_tb avec une tâche enfant asynchrone qui prendrait 10 secondes pour se terminer. Elle annule ensuite la tâche enfant avant qu’elle ne soit terminée et que les données n’aient été insérées.
Le code suivant interroge la table test_tb, mais ne renvoie aucun résultat car aucune donnée n’a été insérée.
Attente et blocage pendant l’exécution d’un job enfant asynchrone¶
Dans l’exemple suivant, la procédure blockUntilDone exécute une tâche enfant asynchrone qui prend 5 secondes pour se terminer. En utilisant la méthode snowflake.snowpark.AsyncJob.result, la procédure attend et revient lorsque la tâche est terminée.
Le code suivant appelle la procédure blockUntilDone, qui revient après 5 secondes d’attente.
Renvoi d’une erreur après la requête de résultats d’un job enfant asynchrone non terminé¶
Dans l’exemple suivant, la procédure earlyReturn exécute une tâche enfant asynchrone qui prend 60 secondes pour se terminer. La procédure tente alors de renvoyer un DataFrame à partir du résultat de la tâche avant qu’elle n’ait pu se terminer. Le résultat est une erreur.
Le code suivant appelle la procédure earlyReturn et renvoie l’erreur.
Fin d’un job parent avant la fin d’un job enfant, annulation du job enfant¶
Dans l’exemple suivant, la procédure earlyCancelJob exécute une tâche enfant asynchrone pour insérer des données dans une table et prend 10 secondes pour se terminer. Cependant, la tâche parent — async_handler — revient avant que la tâche enfant ne se termine, ce qui annule la tâche enfant.
Le code suivant appelle la procédure earlyCancelJob. Il interroge ensuite la table test_tb, qui ne renvoie aucun résultat car aucune donnée n’a été insérée par la tâche enfant annulée.
Lecture de fichiers et d’actifs¶
Lecture d’un fichier spécifié de façon statique à l’aide de IMPORTS¶
Vous pouvez lire un fichier en spécifiant le nom du fichier et le nom de la zone de préparation dans la clause IMPORTS de la commande CREATE PROCEDURE.
Lorsque vous spécifiez un fichier dans la clause IMPORTS, Snowflake copie ce fichier de la zone de préparation vers le répertoire personnel (également appelé répertoire d’importation) de la procédure stockée, qui est le répertoire à partir duquel la procédure stockée lit le fichier.
Snowflake copie les fichiers importés dans un répertoire unique. Tous les fichiers de ce répertoire doivent avoir des noms uniques, de sorte que chaque fichier de votre clause IMPORTS doit avoir un nom distinct. Cette règle s’applique même si les fichiers commencent dans différentes zones de préparation ou différents sous-répertoires au sein d’une zone de préparation.
L’exemple suivant utilise un gestionnaire Python en ligne qui lit un fichier appelé file.txt à partir d’une zone de préparation nommée my_stage. Le gestionnaire récupère l’emplacement du répertoire personnel de la procédure stockée en utilisant la méthode Python sys._xoptions avec l’option système snowflake_import_directory.
Snowflake lit le fichier une seule fois pendant la création de la procédure stockée, et ne le relira pas pendant l’exécution de la procédure stockée si la lecture du fichier a lieu en dehors du gestionnaire cible.
Créez la procédure stockée avec un gestionnaire en ligne :
Importation d’un répertoire à l’aide de IMPORTS¶
Vous pouvez importer un répertoire à l’aide de la clause IMPORTS de la commande CREATE PROCEDURE.
Note
Le chemin d’importation d’un répertoire doit se terminer par une barre oblique (
/). Par exemple,IMPORTS = ('@my_stage/my_dir/').Pour renommer un répertoire lors de l’importation, ajoutez``/=custom_name/`` au chemin de la zone de préparation. Le nom personnalisé doit être un nom de répertoire unique, pas un chemin. Par exemple,
IMPORTS = ('@my_stage/my_dir/=custom_name/').Les importations de répertoire ne sont pas prises en charge dans les applications natives.
L’exemple suivant importe un répertoire appelé my_dir d’une zone de préparation nommée my_stage et répertorie les fichiers qu’elle contient.
