Beispiele für Python-UDF-Handler

Unter diesem Thema werden einfache Beispiele für in Python geschriebenen UDF-Handler-Code bereitgestellt.

Weitere Informationen zur Verwendung von Python für das Erstellen eines UDF-Handlers finden Sie unter Erstellen von Python-UDFs.

Setzen Sie runtime_version auf die Version der Python-Laufzeitumgebung, die von Ihrem Code benötigt wird. Folgende Versionen von Python werden unterstützt:

  • 3.8

  • 3.9

  • 3.10

  • 3.11

Importieren eines Pakets in einen Inline-Handler

Eine kuratierte Liste von Drittanbieter-Paketen von Anaconda ist verfügbar. Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen.

Bemerkung

Bevor Sie die von Anaconda bereitgestellten Pakete nutzen können, muss Ihr Snowflake-Organisationsadministrator die Snowflake-Bedingungen für Drittanbieter anerkennen. Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieterpaketen aus Anaconda.

Der folgende Code zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

Erstellen der UDF:

CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

Aufrufen der UDF:

SELECT py_udf();
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

Lesen einer Datei

Sie können den Inhalt einer Datei mit Python-UDF-Handler-Code lesen. Sie könnten zum Beispiel eine Datei lesen, um unstrukturierte Daten zu verarbeiten.

Um den Inhalt einer Datei zu lesen, können Sie Folgendes tun:

Lesen einer statisch spezifizierten Datei mithilfe von IMPORTS

Sie können eine Datei lesen, indem Sie den Dateinamen und den Namen des Stagingbereichs in der IMPORTS-Klausel des Befehls CREATE FUNCTION angeben.

Wenn in der IMPORTS-Klausel eine Datei angegeben wird, kopiert Snowflake diese Datei vom Stagingbereich in das Basisverzeichnis der UDF (auch Importverzeichnis genannt), welches das Verzeichnis ist, aus dem die UDF die Datei ausliest.

Snowflake kopiert importierte Dateien in ein einziges Verzeichnis. Alle Dateien in diesem Verzeichnis müssen eindeutige Namen haben, d. h. jede Datei in Ihrer IMPORTS-Klausel muss einen eigenen Namen haben. Dies gilt auch dann, wenn sich die Dateien in verschiedenen Stagingbereichen oder in verschiedenen Unterverzeichnissen innerhalb eines Stagingbereichs befinden.

Bemerkung

Sie können nur Dateien aus dem obersten Verzeichnis im Stagingbereich importieren, nicht aus Unterordnern.

Das folgende Beispiel verwendet einen Inline-Python-Handler, der eine Datei namens file.txt aus einem Stagingbereich namens my_stage liest. Der Handler ermittelt den Speicherort des Basisverzeichnisses der UDF mithilfe der Python-Methode sys._xoptions und der Systemoption snowflake_import_directory.

Snowflake liest die Datei nur einmal beim Erstellen der UDF und wird die Datei bei Ausführung der UDF nicht noch einmal lesen, wenn das Lesen der Datei außerhalb des Ziel-Handlers erfolgt.

Erstellen der UDF mit einem Inline-Handler:

CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

Lesen einer mit SnowflakeFile dynamisch spezifizierten Datei

Mit der Klasse SnowflakeFile des Snowpark-Moduls snowflake.snowpark.files können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.

Die SnowflakeFile-Klasse verfügt über eine Methode zum Öffnen einer Datei: open. Die Methode open gibt ein SnowflakeFile-Objekt zurück, das die IOBase-Dateiobjekte von Python erweitert.

Das SnowflakeFile-Objekt unterstützt die folgenden Methoden: IOBase, BufferedIOBase und RawIOBase:

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

Weitere Informationen dazu finden Sie in der Python 3.8-Dokumentation zu IOBase. Das Aufrufen von Methoden, die von Snowflake-Servern nicht unterstützt werden, wie z. B. die Methode fileno, führt zu einem Fehler.

Bemerkung

Standardmäßig werden für den Dateizugriff mit SnowflakeFile Bereichs-URLs benötigt, um Ihren Code gegen Angriffe per Dateieinschleusung zu schützen. Mit der integrierten Funktion BUILD_SCOPED_FILE_URL können Sie eine Bereichs-URL in SQL erstellen. Weitere Informationen zu Bereichs-URLs finden Sie unter Verfügbare URL-Typen für Zugriff auf Dateien. Nur Benutzer mit Zugriff auf die Datei können eine Bereichs-URL erstellen.

Voraussetzungen

Bevor der Python-Handler-Code eine in einem Stagingbereich befindliche Datei lesen kann, müssen Sie Folgendes tun, um die Datei für den Code zur Verfügung zu stellen:

  1. Erstellen Sie einen Stagingbereich, auf den Ihr Handler zugreifen kann.

    Sie können einen externen Stagingbereich oder eine interne Stagingbereich verwenden. Wenn Sie einen internen Stagingbereich verwenden und planen, eine gespeicherte Prozedur mit Aufruferrechten zu erstellen, dann kann es sich um einen Benutzer-Stagingbereich handeln. Andernfalls müssen Sie einen benannten Stagingbereich verwenden. Snowflake unterstützt derzeit nicht die Verwendung eines Tabellen-Stagingbereichs für UDF-Abhängigkeiten.

    Weitere Informationen zum Erstellen eines Stagingbereichs finden Sie unter CREATE STAGE. Weitere Informationen zur Auswahl des Typs eines internen Stagingbereichs finden Sie unter Auswahl eines internen Stagingbereichs für lokale Dateien.

    Je nach Anwendungsfall müssen der folgenden Rolle entsprechende Berechtigungen für den Stagingbereich zugewiesen werden:

    Anwendungsfall

    Rolle

    UDF oder gespeicherte Prozedur mit Eigentümerrechten

    Die Rolle, die Eigentümer der ausführenden UDF oder gespeicherten Prozedur ist.

    Gespeicherte Prozedur mit Aufruferrechten

    Die Benutzerrolle.

    Weitere Informationen dazu finden Sie unter Granting Privileges for User-Defined Functions.

  2. Kopieren Sie die Datei, die Ihr Code lesen soll, in den Stagingbereich.

    Sie können die Datei von einem lokalen Laufwerk in einen internen Stagingbereich kopieren, indem Sie den Befehl PUT verwenden. Weitere Informationen zum Staging von Dateien mit PUT finden Sie unter Staging von Datendateien aus einem lokalen Dateisystem.

    Sie können die Datei von einem lokalen Laufwerk in einen externen Stagingbereich kopieren, indem Sie eines der von Ihrem Cloudspeicherdienst bereitgestellten Tools verwenden. Weitere Informationen dazu finden Sie in der Dokumentation Ihres Cloudspeicherdienstes:

Berechnen des Wahrnehmungshashs eines Bildes (Perceptual Hashing) mit einem Inline-Python-Handler

In diesem Beispiel wird SnowflakeFile verwendet, um ein Paar von Staging-Bilddateien zu lesen und über den perzeptuellen Hash (pHash) jeder Datei festzustellen, wie ähnlich die Bilder sind.

Erstellen einer UDF, die den pHash-Wert eines Bildes zurückgibt, wobei der Eingabemodus als binär angegeben wird, indem rb als mode-Argument übergeben wird:

CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

Erstellen einer zweiten UDF, die den Abstand zwischen den pHash-Werten zweier Bilder berechnet:

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

Speichern der Bilddateien im Stagingbereich und Aktualisieren der Verzeichnistabelle:

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

Aufrufen der UDFs:

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

Verarbeitung einer CSV-Datei mit einer UDTF

In diesem Beispiel wird SnowflakeFile verwendet, um eine UDTF zu erstellen, die den Inhalt einer CSV-Datei extrahiert und die Zeilen in einer Tabelle zurückgibt.

Erstellen der UDTF mit einem Inline-Handler:

CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

Speichern der CSV-Datei im Stagingbereich und Aktualisieren der Verzeichnistabelle:

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

Aufrufen der UDTF und Übergeben einer Datei-URL:

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

Verarbeitung mehrerer Dateien

Sie können mehrere Dateien lesen und verarbeiten, indem Sie die Spalte RELATIVE_PATH einer Verzeichnistabelle an Ihren Handler übergeben. Weitere Informationen zur Spalte RELATIVE_PATH finden Sie in der Ausgabe einer Verzeichnistabellenabfrage.

Bemerkung

Je nach Dateigröße und Computebedarf sollten Sie ALTER WAREHOUSE verwenden, um Ihr Warehouse zu skalieren, bevor Sie eine Anweisung ausführen, die mehrere Dateien liest und verarbeitet.

Aufrufen einer UDF, um mehrere Dateien zu verarbeiten:

Das folgende Beispiel ruft eine UDF innerhalb einer CREATE TABLE-Anweisung auf, um jede Datei in einem Stagingbereich zu verarbeiten und die Ergebnisse dann in einer neuen Tabelle zu speichern.

Zu Demonstrationszwecken geht das Beispiel von folgenden Annahmen aus:

  • In einem Stagingbereich mit dem Namen my_stage befinden sich mehrere Textdateien.

  • Es gibt eine UDF mit dem Namen get_sentiment, die eine Stimmungsanalyse für unstrukturierten Text durchführt. Die UDF verwendet einen Pfad zu einer Textdatei als Eingabe und gibt einen Wert zurück, der die Stimmung repräsentiert.

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
Aufrufen einer UDTF, um mehrere Dateien zu verarbeiten:

Im nächsten Beispiel wird eine UDTF namens parse_excel_udtf aufgerufen. Das Beispiel übergibt den relativen Pfad (relative_path) aus der Verzeichnistabelle an den Stagingbereich mit dem Namen my_excel_stage.

SELECT t.*
FROM directory(@my_stage) d,
table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

Lesen von Dateien mit Stagingbereichs-URIs und -URLs

Für den Dateizugriff mit SnowflakeFile werden standardmäßig Bereichs-URLs benötigt. Damit wird Ihr Code widerständig gegen Angriffe per Dateieinschleusung. Sie können jedoch stattdessen auch mit einer Stagingbereichs-URI oder einer Stagingbereich-URL auf einen Dateispeicherort verweisen. Dazu müssen Sie die Methode SnowflakeFile.open mit dem Schlüsselwortargument require_scoped_url = False aufrufen.

Diese Option ist nützlich, wenn Sie möchten, dass ein Aufrufer eine URI bereitstellen kann, auf die nur der UDF-Eigentümer zugreifen darf. Sie könnten zum Beispiel eine Stagingbereichs-URI für den Dateizugriff verwenden, wenn Sie Eigentümer einer UDF sind und Sie Ihre Konfigurationsdateien oder Ihre Modelle für maschinelles Lernen lesen möchten. Wir empfehlen diese Option nicht, wenn Sie Dateien verwenden, die unvorhersehbare Namen haben können, wie z. B. Dateien, die auf Basis von Benutzereingaben erstellt werden.

Dieses Beispiel liest ein Modell des maschinellen Lernens aus einer Datei und verwendet das Modell in einer Funktion zur Verarbeitung natürlicher Sprache für die Stimmungsanalyse. In dem Beispiel wird die Methode open mit require_scoped_url = False aufgerufen. In beiden Dateispeicherformaten (Stagingbereichs-URI und Stagingbereichs-URL) muss der UDF-Eigentümer Zugriff auf die Modelldatei haben.

Erstellen der UDF mit einem Inline-Handler:

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Speichern der Modelldatei im Stagingbereich und Aktualisieren der Verzeichnistabelle:

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

Alternativ können Sie die UDF mit der Stagingbereichs-URL des Modells angeben, um die Stimmung zu extrahieren.

Erstellen Sie zum Beispiel eine UDF mit einem Inline-Handler, der eine Datei mit einer Stagingbereichs-URL angibt:

CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

Aufrufen der UDF mit den Eingabedaten:

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

Schreiben von Dateien

Ein UDF-Handler kann Dateien in ein /tmp-Verzeichnis schreiben, das für die Abfrage erstellt wurde, die die UDF aufruft.

Denken Sie daran, dass ein /tmp-Verzeichnis für eine einzige aufrufende Abfrage reserviert ist, dass aber mehrere Python-Worker-Prozesse gleichzeitig aktiv sein können. Um Kollisionen zu vermeiden, müssen Sie entweder sicherstellen, dass der Zugriff auf das „/tmp“-Verzeichnis mit anderen Python-Worker-Prozessen synchronisiert wird oder dass die Namen der in das „/tmp“-Verzeichnis geschriebenen Dateien eindeutig sind.

Ein Codebeispiel dazu finden Sie unter Entpacken einer Stagingdatei (unter diesem Thema).

Der Code im folgenden Beispiel schreibt die Eingabe text in das Verzeichnis /tmp. Außerdem wird die Prozess-ID der Funktion angehängt, um die Eindeutigkeit des Dateispeicherorts sicherzustellen.

def func(text):
   # Append the function's process ID to ensure the file name's uniqueness.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Entpacken einer Stagingdatei

Sie können eine ZIP-Datei in einem Stagingbereich speichern und sie dann mithilfe des Python-Moduls „zipfile“ in eine UDF entpacken.

Sie können z. B. eine ZIP-Datei in einen Stagingbereich hochladen und dann bei der Erstellung der UDF in der IMPORTS-Klausel auf die ZIP-Datei im Stagingbereich verweisen. Zur Laufzeit kopiert Snowflake die Stagingdatei in ein Importverzeichnis, von dem aus Ihr Code auf die Datei zugreifen kann.

Weitere Informationen zum Lesen und Schreiben von Dateien finden Sie unter Lesen einer Datei und Schreiben von Dateien.

Im folgenden Beispiel verwendet der UDF-Code ein NLP-Modell, um Entitäten im Text zu erkennen. Der Code gibt ein Array mit diesen Entitäten zurück. Um das NLP-Modell für die Verarbeitung des Textes einzurichten, verwendet der Code zunächst das „zipfile“-Modul, das die Datei für das Modell (en_core_web_sm-2.3.1) aus einer ZIP-Datei extrahiert. Der Code verwendet dann das „spaCy“-Modul, um das Modell aus der Datei zu laden.

Beachten Sie, dass der Code die extrahierten Dateiinhalte in das „/tmp“-Verzeichnis schreibt, das für die Abfrage erstellt wurde, die diese Funktion aufruft. Der Code verwendet Dateisperren, um sicherzustellen, dass die Extraktion über Python-Worker-Prozesse hinweg synchronisiert wird. Auf diese Weise wird der Inhalt nur einmal entpackt. Weiter Informationen zum Schreiben von Dateien finden Sie unter Schreiben von Dateien.

Weitere Informationen zum „zipfile“-Modul finden Sie in der zipfile-Referenz. Weitere Informationen zum „spaCy“-Modul finden Sie in der spaCy-API-Dokumentation.

Erstellen der UDF mit einem Inline-Handler:

CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

Verarbeiten von NULL-Werten

Der folgende Code zeigt, wie NULL-Werte verarbeitet werden. Weitere Informationen dazu finden Sie unter NULL-Werte.

Erstellen der UDF:

CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

Aufrufen der UDF:

SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy