Exemples de gestionnaires d’UDF Python

Cette rubrique comprend des exemples simples de code de gestionnaire d’UDF écrits en Python.

Pour obtenir des informations sur l’utilisation de Python pour créer un gestionnaire d’UDF, reportez-vous à Création d’UDFs Python.

Définissez runtime_version sur la version de l’environnement d’exécution Python requise par votre code. Les versions de Python prises en charge sont les suivantes :

  • 3,8

  • 3,9

  • 3,10

  • 3,11

Importation d’un paquet dans un gestionnaire en ligne

Une sélection de paquets tiers d’Anaconda est disponible. Pour plus d’informations, voir Utilisation de paquets tiers.

Note

Avant que vous puissiez utiliser les paquets fournis par Anaconda, l’administrateur de votre organisation Snowflake doit accepter les conditions de tiers de Snowflake. Pour plus d’informations, voir Utilisation de paquets tiers à partir d’Anaconda.

Le code suivant montre comment importer des paquets et renvoyer leurs versions.

Créez l’UDF :

CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

Appelez l’UDF :

SELECT py_udf();
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

Vous pouvez utiliser le mot-clé PACKAGES pour spécifier les versions des paquets comme suit :

  • Sans version (par exemple numpy)

  • Avec une version exacte (par exemple, numpy==1.25.2)

  • Contrainte à un préfixe de version en utilisant des caractères génériques (par exemple numpy==1.*)

  • Contrainte à une plage de versions (par exemple numpy>=1.25)

  • Contrainte par plusieurs spécificateurs de version (par exemple numpy>=1.25,<2) de sorte qu’un paquet qui satisfait à tous les spécificateurs de version sera sélectionné.

Note

L’utilisation de plusieurs opérateurs de plage (par exemple numpy>=1.25,<2) n’est pas prise en charge dans les politiques de paquets, mais vous pouvez les utiliser lors de la création d’UDF, d’UDTF et de procédures stockées Python.

Voici un exemple d’utilisation du caractère générique * pour contraindre un paquet à un préfixe de version.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy==1.*')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Cet exemple montre comment contraindre un paquet à être supérieur ou égal à une version spécifiée.

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Cet exemple montre comment utiliser plusieurs spécificateurs de versions de paquets :

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2,<2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

Lecture d’un fichier

Vous pouvez lire le contenu d’un fichier avec le code du gestionnaire d’UDF Python. Par exemple, vous pourriez vouloir lire un fichier pour traiter des données non structurées.

Pour lire le contenu d’un fichier, vous pouvez :

Lecture d’un fichier spécifié de façon statique à l’aide de IMPORTS

Vous pouvez lire un fichier en spécifiant le nom du fichier et le nom de la zone de préparation dans la clause IMPORTS de la commande CREATE FUNCTION.

Lorsque vous spécifiez un fichier dans la clause IMPORTS, Snowflake copie ce fichier de la zone de préparation vers le répertoire personnel (également appelé répertoire d’importation) de l’UDF, qui est le répertoire à partir duquel l’UDF lit le fichier.

Snowflake copie les fichiers importés dans un répertoire unique. Tous les fichiers de ce répertoire doivent avoir des noms uniques, de sorte que chaque fichier de votre clause IMPORTS doit avoir un nom distinct. Cette règle s’applique même si les fichiers commencent dans différentes zones de préparation ou différents sous-répertoires au sein d’une zone de préparation.

Note

Vous ne pouvez importer des fichiers qu’à partir du répertoire de premier niveau d’une zone de préparation, pas des sous-dossiers.

L’exemple suivant utilise un gestionnaire Python en ligne qui lit un fichier appelé file.txt à partir d’une zone de préparation nommée my_stage. Le gestionnaire récupère l’emplacement du répertoire personnel de l’UDF en utilisant la méthode Python sys._xoptions avec l’option système snowflake_import_directory.

Snowflake lit le fichier une seule fois pendant la création de l’UDF, et ne le relira pas pendant l’exécution de l’UDF si la lecture du fichier a lieu en dehors du gestionnaire cible.

Créez l’UDF avec un gestionnaire en ligne :

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

Lecture d’un fichier spécifié de façon dynamique avec SnowflakeFile

Vous pouvez lire un fichier depuis une zone de préparation en utilisant la classe SnowflakeFile dans le module Snowpark snowflake.snowpark.files. La classe SnowflakeFile offre un accès dynamique aux fichiers, ce qui vous permet de diffuser des fichiers de n’importe quelle taille. L’accès dynamique aux fichiers est également utile lorsque vous souhaitez itérer sur plusieurs fichiers. Par exemple, voir Traitement de plusieurs fichiers.

La classe SnowflakeFile possède une méthode pour ouvrir un fichier : open. La méthode open renvoie un objet SnowflakeFile qui étend les objets de fichier IOBase Python.

L’objet SnowflakeFile prend en charge les méthodes suivantes : IOBase, BufferedIOBase; et RawIOBase :

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

Pour plus d’informations, voir la documentation Python 3.8 sur IOBase. L’appel à des méthodes non prises en charge dans un serveur Snowflake, telles que la méthode fileno, entraînera une erreur.

Note

Par défaut, l’accès aux fichiers avec SnowflakeFile nécessite des URLs scopées afin de rendre votre code résistant aux attaques par injection de fichiers. Vous pouvez créer une URL scopée en SQL à l’aide de la fonction intégrée BUILD_SCOPED_FILE_URL. Pour plus d’informations sur les URLs scopées, voir Types d’URLs disponibles pour accéder aux fichiers. Seuls les utilisateurs ayant accès au fichier peuvent créer une URL scopée.

Conditions préalables

Avant que votre code de gestionnaire Python puisse lire un fichier sur une zone de préparation, vous devez effectuer les opérations suivantes pour mettre le fichier à la disposition du code :

  1. Créez une zone de préparation accessible à votre gestionnaire.

    Vous pouvez utiliser une zone de préparation externe ou une zone de préparation interne. Si vous utilisez une zone de préparation interne, il peut s’agir d’une zone de préparation utilisateur lorsque vous prévoyez de créer une procédure stockée avec droits de l’appelant. Sinon, vous devez utiliser une zone de préparation nommée. Snowflake ne prend actuellement pas en charge l’utilisation d’une zone de préparation de table pour les dépendances UDF.

    Pour en savoir plus sur la création d’une zone de préparation, voir CREATE STAGE. Pour en savoir plus sur le choix d’un type de zone de préparation interne, voir Sélection d’une zone de préparation interne pour les fichiers locaux.

    Des privilèges adéquats sur la zone de préparation doivent être attribués au rôle suivant, en fonction de votre casse :

    Cas d’utilisation

    Rôle

    UDF ou procédure stockée avec droits du propriétaire

    Rôle qui possède l’UDF ou la procédure stockée en cours d’exécution.

    Procédure stockée avec droits de l’appelant

    Rôle de l’utilisateur.

    Pour plus d’informations, voir Granting Privileges for User-Defined Functions.

  2. Copiez le fichier que votre code lira dans la zone de préparation.

    Vous pouvez copier le fichier d’un lecteur local vers une zone de préparation interne en utilisant la commande PUT. Pour des informations sur la mise en zone de préparation de fichiers avec PUT, voir Mise en zone de préparation de fichiers de données à partir d’un système de fichiers local.

    Vous pouvez copier le fichier d’un lecteur local vers une zone de préparation externe à l’aide de l’un des outils fournis par votre service de stockage dans le Cloud. Pour obtenir de l’aide, consultez la documentation de votre service de stockage dans le Cloud.

Calcul du hachage perceptuel d’une image avec un gestionnaire Python en ligne

Cet exemple utilise SnowflakeFile pour lire une paire de fichiers images en zone de préparation et utiliser le hachage perceptuel (pHash) de chaque fichier pour déterminer le degré de similitude des images entre elles.

Créez une UDF qui renvoie la valeur phash d’une image, en spécifiant le mode d’entrée comme étant binaire en transmettant rb pour l’argument mode :

CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

Créez une deuxième UDF qui calcule la distance entre les valeurs de phash de deux images :

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

Mettez en zone de préparation les fichiers images et actualisez la table du répertoire :

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

Appelez l’UDFs :

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

Traitement d’un fichier CSV à l’aide d’une UDTF

Cet exemple utilise SnowflakeFile pour créer une UDTF qui extrait le contenu d’un fichier CSV et renvoie les lignes dans un tableau.

Créez l’UDTF avec un gestionnaire en ligne :

CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

Mettez en zone de préparation le fichier CSV et actualisez la table du répertoire :

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

Appelez l’UDTF en lui transmettant l’URL d’un fichier :

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

Traitement de plusieurs fichiers

Vous pouvez lire et traiter plusieurs fichiers en transmettant la colonne RELATIVE_PATH d’une table de répertoire à votre gestionnaire. Pour plus d’informations sur la colonne RELATIVE_PATH, consultez le résultat d’une requête de table de répertoire.

Note

En fonction de la taille de vos fichiers et de vos besoins de calcul, vous pouvez utiliser ALTER WAREHOUSE pour mettre votre entrepôt à une échelle supérieure avant d’exécuter une instruction qui lit et traite plusieurs fichiers.

Appelez l’UDF pour traiter plusieurs fichiers :

L’exemple suivant appelle une UDF au sein d’une instruction CREATE TABLE pour traiter chaque fichier sur une zone de préparation, puis stocker les résultats dans une nouvelle table.

À des fins de démonstration, l’exemple repose sur les hypothèses suivantes :

  • Une zone de préparation nommée my_stage contient plusieurs fichiers texte.

  • Il existe une UDF nommée get_sentiment qui effectue une analyse des sentiments sur du texte non structuré. L’UDF prend en entrée le chemin d’un fichier texte et renvoie une valeur représentant le sentiment.

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
Appelez l’UDTF pour traiter plusieurs fichiers :

L’exemple suivant appelle une UDTF nommée parse_excel_udtf. L’exemple transmet le relative_path de la table du répertoire à la zone de préparation nommée my_excel_stage.

SELECT t.*
FROM directory(@my_stage) d,
table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

Lecture de fichiers avec des URLs et des URIs de zones de préparation

L’accès à un fichier avec SnowflakeFile nécessite par défaut des URLs scopées. Cela rend votre code résistant aux attaques par injection de fichiers. Toutefois, vous pouvez faire référence à l’emplacement d’un fichier en utilisant une URI ou une URL de zone de préparation. Pour ce faire, vous devez appeler la méthode SnowflakeFile.open avec l’argument mot-clé require_scoped_url = False.

Cette option est utile lorsque vous voulez permettre à un fournisseur de fournir une URI qui n’est accessible qu’au propriétaire de l’UDF. Par exemple, vous pouvez utiliser une URI de zone de préparation pour l’accès aux fichiers si vous possédez une UDF et que vous souhaitez lire vos fichiers de configuration ou vos modèles de machine learning. Nous ne recommandons pas cette option si vous travaillez avec des fichiers dont les noms sont imprévisibles, tels que les fichiers créés en fonction des entrées de l’utilisateur.

Cet exemple lit un modèle de machine learning dans un fichier et utilise le modèle dans une fonction pour effectuer un traitement du langage naturel pour l’analyse des sentiments. L’exemple appelle open avec require_scoped_url = False. Dans les deux formats de localisation des fichiers (URI et URL de zone de préparation), le propriétaire de l’UDF doit avoir accès au fichier modèle.

Créez l’UDF avec un gestionnaire en ligne :

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Mettez en zone de préparation le fichier modèle et actualisez la table du répertoire :

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

Vous pouvez également spécifier l’UDF avec l’URL de la zone de préparation du modèle pour extraire le sentiment.

Par exemple, créez une UDF avec un gestionnaire en ligne qui spécifie un fichier en utilisant une URL de zone de préparation :

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Appelez l’UDF avec les données d’entrée :

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

Écriture de fichiers

Un gestionnaire d’UDF peut écrire des fichiers dans un répertoire /tmp créé pour la requête appelant l’UDF.

N’oubliez pas qu’un répertoire /tmp est réservé à une seule requête d’appel, alors que plusieurs processus de travail Python peuvent être exécutés en même temps. Pour éviter les collisions, vous devez vous assurer que l’accès au répertoire /tmp est synchronisé avec les autres processus de travail Python ou que les noms des fichiers écrits dans /tmp sont uniques.

Pour un exemple de code, voir Décompression d’un fichier en zone de préparation dans cette rubrique.

Le code de l’exemple suivant écrit l’entrée text dans le répertoire /tmp. Il ajoute également l’ID du processus de la fonction pour garantir l’unicité de l’emplacement du fichier.

def func(text):
   # Append the function's process ID to ensure the file name's uniqueness.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Décompression d’un fichier en zone de préparation

Vous pouvez stocker un fichier .zip dans une zone de préparation, puis le décompresser dans une UDF en utilisant le module Python zipfile.

Par exemple, vous pouvez télécharger un fichier .zip dans une zone de préparation, puis référencer le fichier .zip à son emplacement de zone de préparation dans la clause IMPORTS lorsque vous créez l’UDF. Au moment de l’exécution, Snowflake copiera le fichier en zone de préparation dans un répertoire d’importation à partir duquel votre code pourra y accéder.

Pour plus d’informations sur la lecture et l’écriture de fichiers, voir Lecture d’un fichier et Écriture de fichiers.

Dans l’exemple suivant, le code de l’UDF utilise un modèle NLP pour découvrir les entités dans le texte. Le code renvoie un tableau de ces entités. Pour configurer le modèle NLP afin de traiter le texte, le code utilise d’abord le module zipfile pour extraire le fichier du modèle (en_core_web_sm-2.3.1) d’un fichier .zip. Le code utilise ensuite le module spaCy pour charger le modèle à partir du fichier.

Notez que le code écrit le contenu des fichiers extraits dans le répertoire /tmp créé pour la requête qui appelle cette fonction. Le code utilise des verrous de fichier pour garantir que l’extraction est synchronisée entre les processus de travail Python ; de cette façon, le contenu n’est décompressé qu’une seule fois. Pour plus d’informations sur l’écriture de fichiers, voir Écriture de fichiers.

Pour en savoir plus sur le module zipfile, consultez la référence zipfile. Pour en savoir plus sur le module spaCy, consultez la documentation de l”API spaCy.

Créez l’UDF avec un gestionnaire en ligne :

CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

Traitement des valeurs NULL

Le code suivant montre comment les valeurs NULL sont traitées. Pour plus d’informations, voir Valeurs NULL.

Créez l’UDF :

CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

Appelez l’UDF :

SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy