Exemples de gestionnaires Python pour les procédures stockées

Exécution de tâches simultanées à l’aide de processus de tâches worker

Vous pouvez exécuter des tâches simultanées à l’aide de processus de travail Python. Cela peut s’avérer utile lorsque vous devez exécuter des tâches parallèles qui tirent parti de plusieurs cœurs CPU sur les nœuds de l’entrepôt.

Note

Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.

Pour contourner les cas où le Python Global Interpreter Lock empêche une approche multitâche de s’étendre à tous les cœurs CPU, vous pouvez exécuter des tâches concurrentes à l’aide de processus de travail distincts, plutôt que de threads.

Vous pouvez le faire sur des entrepôts Snowflake en utilisant la classe Parallel de la bibliothèque joblib comme dans l’exemple suivant.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.12
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;

Note

Le backend par défaut utilisé pour joblib.Parallel diffère entre les entrepôts standards de Snowflake et les entrepôts optimisés pour Snowpark.

  • Valeur par défaut de l’entrepôt standard : threading

  • Valeur par défaut de l’entrepôt optimisé pour Snowpark : loky (multitraitement)

Vous pouvez remplacer le paramètre de backend par défaut en appelant la fonction joblib.parallel_backend comme dans l’exemple suivant.

import joblib
joblib.parallel_backend('loky')

Utilisation d’APIs Snowpark pour le traitement asynchrone

Les exemples suivants illustrent la manière dont vous pouvez utiliser des APIs Snowpark pour lancer des tâches enfant asynchrones, ainsi que le comportement de ces tâches dans différentes conditions.

Vérification du statut d’un job enfant asynchrone

Dans l’exemple suivant, la procédure checkStatus exécute une tâche enfant asynchrone qui attend 60 secondes. La procédure vérifie ensuite le statut de la tâche avant qu’elle ne soit terminée, de sorte que la vérification renvoie False.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;

Le code suivant appelle la procédure.

CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

Annulation d’un job enfant asynchrone

Dans l’exemple suivant, la procédure cancelJob utilise SQL pour insérer des données dans la table test_tb avec une tâche enfant asynchrone qui prendrait 10 secondes pour se terminer. Elle annule ensuite la tâche enfant avant qu’elle ne soit terminée et que les données n’aient été insérées.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();

Le code suivant interroge la table test_tb, mais ne renvoie aucun résultat car aucune donnée n’a été insérée.

SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Attente et blocage pendant l’exécution d’un job enfant asynchrone

Dans l’exemple suivant, la procédure blockUntilDone exécute une tâche enfant asynchrone qui prend 5 secondes pour se terminer. En utilisant la méthode snowflake.snowpark.AsyncJob.result, la procédure attend et revient lorsque la tâche est terminée.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;

Le code suivant appelle la procédure blockUntilDone, qui revient après 5 secondes d’attente.

CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

Renvoi d’une erreur après la requête de résultats d’un job enfant asynchrone non terminé

Dans l’exemple suivant, la procédure earlyReturn exécute une tâche enfant asynchrone qui prend 60 secondes pour se terminer. La procédure tente alors de renvoyer un DataFrame à partir du résultat de la tâche avant qu’elle n’ait pu se terminer. Le résultat est une erreur.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;

Le code suivant appelle la procédure earlyReturn et renvoie l’erreur.

CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

Fin d’un job parent avant la fin d’un job enfant, annulation du job enfant

Dans l’exemple suivant, la procédure earlyCancelJob exécute une tâche enfant asynchrone pour insérer des données dans une table et prend 10 secondes pour se terminer. Cependant, la tâche parent — async_handler — revient avant que la tâche enfant ne se termine, ce qui annule la tâche enfant.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.12
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;

Le code suivant appelle la procédure earlyCancelJob. Il interroge ensuite la table test_tb, qui ne renvoie aucun résultat car aucune donnée n’a été insérée par la tâche enfant annulée.

CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+

Lecture de fichiers et d’actifs

Lecture d’un fichier spécifié de façon statique à l’aide de IMPORTS

Vous pouvez lire un fichier en spécifiant le nom du fichier et le nom de la zone de préparation dans la clause IMPORTS de la commande CREATE PROCEDURE.

Lorsque vous spécifiez un fichier dans la clause IMPORTS, Snowflake copie ce fichier de la zone de préparation vers le répertoire personnel (également appelé répertoire d’importation) de la procédure stockée, qui est le répertoire à partir duquel la procédure stockée lit le fichier.

Snowflake copie les fichiers importés dans un répertoire unique. Tous les fichiers de ce répertoire doivent avoir des noms uniques, de sorte que chaque fichier de votre clause IMPORTS doit avoir un nom distinct. Cette règle s’applique même si les fichiers commencent dans différentes zones de préparation ou différents sous-répertoires au sein d’une zone de préparation.

L’exemple suivant utilise un gestionnaire Python en ligne qui lit un fichier appelé file.txt à partir d’une zone de préparation nommée my_stage. Le gestionnaire récupère l’emplacement du répertoire personnel de la procédure stockée en utilisant la méthode Python sys._xoptions avec l’option système snowflake_import_directory.

Snowflake lit le fichier une seule fois pendant la création de la procédure stockée, et ne le relira pas pendant l’exécution de la procédure stockée si la lecture du fichier a lieu en dehors du gestionnaire cible.

Créez la procédure stockée avec un gestionnaire en ligne :

CREATE OR REPLACE PROCEDURE test_file_import_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/dir/file.txt')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys

def run(session):
  with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    return f.read()
$$;
CALL test_file_import_sp();
// return file content

Importation d’un répertoire à l’aide de IMPORTS

Vous pouvez importer un répertoire à l’aide de la clause IMPORTS de la commande CREATE PROCEDURE.

Note

  • Le chemin d’importation d’un répertoire doit se terminer par une barre oblique (/). Par exemple, IMPORTS = ('@my_stage/my_dir/').

  • Pour renommer un répertoire lors de l’importation, ajoutez``/=custom_name/`` au chemin de la zone de préparation. Le nom personnalisé doit être un nom de répertoire unique, pas un chemin. Par exemple, IMPORTS = ('@my_stage/my_dir/=custom_name/').

  • Les importations de répertoire ne sont pas prises en charge dans les applications natives.

L’exemple suivant importe un répertoire appelé my_dir d’une zone de préparation nommée my_stage et répertorie les fichiers qu’elle contient.

CREATE OR REPLACE PROCEDURE my_directory_import_list_sp()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('snowflake-snowpark-python')
IMPORTS = ('@my_stage/my_dir/')
HANDLER = 'run'
RUNTIME_VERSION = 3.12
EXECUTE AS CALLER
AS $$
import os
import sys
def list_files(directory):
  files = []
  # Walk through the directory and its subdirectories
  for dirpath, _, filenames in os.walk(directory):
    for filename in filenames:
      # Append the relative path to each file to the list
      full_path = os.path.join(dirpath, filename)
      files.append(os.path.relpath(full_path, directory))
  return files
def run(session):
  directory_path = sys._xoptions["snowflake_import_directory"]
  file_list = list_files(directory_path)
  file_list_str = ' '.join(file_list)
  return file_list_str
$$;
CALL my_directory_import_list_sp();