Erstellen von Python-UDFs

Unter diesem Thema wird das Erstellen und Installieren einer Python-UDF (benutzerdefinierte Funktion) erklärt.

Unter diesem Thema:

Schreiben von Python-Code

Schreiben von Python-Modulen und -Funktionen

Schreiben Sie ein Modul, die den unten stehenden Spezifikationen entspricht:

  • Definieren Sie das Modul. Ein Modul ist eine Datei, die Python-Definitionen und -Anweisungen enthält.

  • Definieren Sie innerhalb des Moduls eine Funktion.

  • Wenn die Funktion Argumente akzeptiert, muss jedes Argument einen Datentyp aufweisen, der in der Spalte Python Data Type der SQL-Python-Typzuordnungstabelle angegeben ist.

    Funktionsargumente werden über die Position gebunden, nicht über den Namen. Das erste Argument, das an die UDF übergeben wird, ist das erste Argument, das von der Python-Funktion empfangen wird.

  • Geben Sie einen geeigneten Rückgabewert an. Da eine Python-UDF eine skalare Funktion sein muss, muss sie bei jedem Aufruf einen Wert zurückgeben. Der Rückgabewert muss einen Datentyp aufweisen, der in der Spalte Python Data Type der SQL-Python-Typzuordnungstabelle angegebenen ist. Der Typ des Rückgabewerts muss mit dem SQL-Datentyp kompatibel sein, der in der RETURNS-Klausel der CREATE FUNCTION-Anweisung angegeben ist.

  • Ihr Modul kann mehr als eine Funktion enthalten. Die Funktion, die von Snowflake aufgerufen wird, kann andere Funktionen im selben Modul oder in anderen Modulen aufrufen.

  • Ihre Funktion (und alle Funktionen, die von Ihrer Funktion aufgerufen werden) muss die Snowflake-Einschränkungen für Python-UDFs einhalten.

Bemerkung

Mithilfe der Python-UDF-Batch-API können Sie Python-Funktionen definieren, die Batches von Eingabezeilen als Pandas DataFrames empfangen und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben. Weitere Informationen dazu finden Sie unter Python-UDF-Batch-API.

Lesen und Schreiben von Dateien mit einem UDF-Handler

Sie können Dateien mit UDF-Handler-Code lesen und schreiben. Um dies innerhalb der eingeschränkten Engine, in der Snowflake die UDFs ausführt, sicher tun zu können, befolgen Sie die hier beschriebenen Richtlinien.

Lesen von Dateien mit einem UDF-Handler

Ein UDF-Handler kann Dateien lesen, die in einen Snowflake-Stagingbereich hochgeladen wurden. Der Stagingbereich, der die Datei hostet, muss für den Eigentümer der UDF-Datei lesbar sein.

Wenn Sie den Speicherort des Stagingbereichs einer Datei in der IMPORTS-Klausel von CREATE FUNCTION angeben, kopiert Snowflake die Stagingdatei in ein speziell für die UDF verfügbares Importverzeichnis. Ihr Handler-Code kann die Datei von dort aus lesen.

Beachten Sie, dass Snowflake alle importierten Dateien, auch denen aus anderen Stagingbereichen, in ein einziges Importverzeichnis kopiert. Aus diesem Grund müssen die Namen der in der IMPORTS-Klausel angegebenen Dateien relativ zueinander eindeutig sein.

Ein Codebeispiel dazu finden Sie unter Laden einer Datei aus einem Stagingbereich in eine Python-UDF (unter diesem Thema).

So lesen Sie eine Datei mit UDF-Handler-Code:

  1. Kopieren Sie die Datei in den Snowflake-Stagingbereich.

    Sie können den Befehl PUT verwenden, um Dateien aus dem lokalen Verzeichnis eines Client-Computers hochzuladen. Weitere Informationen dazu finden Sie unter PUT. Weitere allgemeine Informationen zum Laden von Dateien in einen Stagingbereich finden Sie unter Übersicht zum Laden von Daten.

  2. Wenn Sie die UDF mit CREATE FUNCTION erstellen, geben Sie den Speicherort der Datei in der IMPORTS-Klausel an.

    Der Code im folgenden Beispiel gibt eine Datei file.txt an, die sich im Stagingbereich namens my_stage befindet.

    create or replace function my_udf()
       ...
       imports=('@my_stage/file.txt')
       ...
    
    Copy
  3. Lesen Sie in Ihrem Handler-Code die Datei aus dem Importverzeichnis.

    Snowflake kopiert die Stagingdatei in das Importverzeichnis der UDF. Sie können den Speicherort des Verzeichnisses mit der Systemoption snowflake_import_directory abrufen.

    In Python-Code können Sie den Speicherort des Verzeichnisses mit der Python-Methode sys._xoptions abrufen, wie im folgenden Beispiel:

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    def compute():
       with open(import_dir + 'file.txt', 'r') as file:
          return file.read()
    
    Copy

Schreiben von Dateien mit einem UDF-Handler

Ein UDF-Handler kann Dateien in ein „/tmp“-Verzeichnis schreiben, das für die Abfrage erstellt wurde, die die UDF aufruft.

Denken Sie daran, dass ein „/tmp“-Verzeichnis für eine einzige aufrufende Abfrage reserviert ist, dass aber mehrere Python-Worker-Prozesse gleichzeitig ablaufen können. Um Kollisionen zu vermeiden, müssen Sie entweder sicherstellen, dass der Zugriff auf das „/tmp“-Verzeichnis mit anderen Python-Worker-Prozessen synchronisiert wird oder dass die Namen der in das „/tmp“-Verzeichnis geschriebenen Dateien eindeutig sind.

Ein Codebeispiel dazu finden Sie unter Entpacken einer Stagingdatei (unter diesem Thema).

Der Code im folgenden Beispiel schreibt die Eingabe text in das Verzeichnis „/tmp“ und hängt die Prozess-ID der Funktion an, um die Eindeutigkeit des Dateispeicherorts sicherzustellen.

def func(text):
   # Ensure the file name's uniqueness by appending the function's process ID.
   file_path = '/tmp/content' + str(os.getpid())
   with open(file_path, "w") as file:
      file.write(text)
Copy

Erstellen der Funktion in Snowflake

Sie müssen eine CREATE FUNCTION-Anweisung ausführen, um Folgendes anzugeben:

  • Name der zu verwendenden SQL-Funktion

  • Name der Python-Methode, die aufgerufen werden soll, wenn die Python-UDF aufgerufen wird.

Der Name der UDF muss nicht mit dem Namen der in Python geschriebenen Handler-Funktion übereinstimmen. Die Anweisung CREATE FUNCTION verknüpft den UDF-Namen mit der Python-Funktion.

Bei der Wahl des UDF-Namens beachten Sie Folgendes:

  • Der Name muss den Regeln für Objektbezeichner folgen.

  • Wählen Sie einen Namen, der eindeutig ist, oder folgen Sie den Regeln für Überladen von Namen.

    Wichtig

    Im Gegensatz zur Überladung bei SQL-UDFs, die zwischen Funktionen sowohl nach der Anzahl als auch nach den Datentypen der Argumente unterscheiden, unterscheiden Python-UDFs Funktionen nur nach der Anzahl der Argumente.

Funktionsargumente werden über die Position gebunden, nicht über den Namen. Das erste Argument, das an die UDF übergeben wird, ist das erste Argument, das von der Python-Funktion empfangen wird.

Weitere Informationen zu den Datentypen von Argumenten finden Sie unter Zuordnung von Datentypen zwischen SQL und Python.

UDFs mit Inline-Code vs. UDFs mit aus einem Stagingbereich hochgeladenem Code

Der Code für eine Python-UDF kann auf eine der folgenden Arten angegeben werden:

  • Hochgeladen aus einem Stagingbereich: Die CREATE FUNCTION-Anweisung gibt den Speicherort des Stagingbereichs an, in dem sich der Python-Quellcode befindet.

  • Inline: Die CREATE FUNCTION-Anweisung gibt den Python-Quellcode an.

Erstellen einer Inline-Python-UDF

Bei einer Inline-UDF stellen Sie den Python-Quellcode als Teil der CREATE FUNCTION -Anweisung bereit.

Die folgende Anweisung erstellt zum Beispiel eine Inline-Python-UDF, die zu einer gegebenen ganzen Zahl den Wert 1 addiert:

create or replace function addone(i int)
returns int
language python
runtime_version = '3.8'
handler = 'addone_py'
as
$$
def addone_py(i):
  return i+1
$$;
Copy

Der Python-Quellcode wird in der AS-Klausel angegeben. Der Quellcode kann entweder in einfache Anführungszeichen oder in doppelte Dollarzeichen ($$) eingeschlossen sein. Die Verwendung der doppelten Dollarzeichen ist normalerweise einfacher, wenn der Quellcode eingebettete einfache Anführungszeichen enthält.

Aufrufen der UDF:

select addone(10);
Copy

Und hier ist die Ausgabe:

+------------+
| ADDONE(10) |
|------------|
|         11 |
+------------+
Copy

Der Python-Quellcode kann mehr als ein Modul und mehr als eine Funktion in einem Modul enthalten. Daher werden Modul und Funktion für den Aufruf in der HANDLER-Klausel angegeben.

Eine Inline-Python-UDF kann Code in Modulen aufrufen, die in der IMPORTS-Klausel enthalten sind.

Weitere Informationen zur CREATE FUNCTION-Anweisung finden Sie unter CREATE FUNCTION.

Weitere Beispiele finden Sie unter Beispiele für Inline-Python-UDFs.

Erstellen eines Python-UDF mit aus einem Stagingbereich hochgeladenem Code

Mit den folgenden Anweisungen wird eine einfache Python-UDF erstellt, deren Code aus einem Stagingbereich hochgeladen wird. Der Stagingbereich, der die Datei hostet, muss für den Eigentümer der UDF-Datei lesbar sein. Außerdem müssen ZIP-Dateien eigenständig sein und dürfen keine zusätzlichen Setup-Skripte zur Ausführung benötigen.

Erstellen Sie eine Python-Datei mit dem Namen sleepy.py, die Ihren Quellcode enthält:

def snore(n):   # return a series of n snores
    result = []
    for a in range(n):
        result.append("Zzz")
    return result
Copy

Starten Sie SnowSQL (CLI-Client), und verwenden Sie den PUT-Befehl, um die Datei aus dem lokalen Dateisystem in den Stagingbereich @~ des Standardbenutzers zu kopieren. (Beachten Sie, dass der PUT-Befehl nicht über die Snowflake-GUI ausgeführt werden kann.)

put
file:///Users/Me/sleepy.py
@~/
auto_compress = false
overwrite = true
;
Copy

Wenn Sie die Datei löschen oder umbenennen, können Sie die UDF-Datei nicht mehr aufrufen. Wenn Sie Ihre Datei aktualisieren müssen, dann nehmen Sie die Aktualisierung vor, während keine Aufrufe an die UDF erfolgen können. Wenn sich die alte Datei noch im Stagingbereich befindet, muss der PUT-Befehl die OVERWRITE=TRUE-Klausel enthalten.

Erstellen Sie die UDF. Der Handler gibt das Modul und die Funktion an.

create or replace function dream(i int)
returns variant
language python
runtime_version = '3.8'
handler = 'sleepy.snore'
imports = ('@~/sleepy.py')
Copy

Aufrufen der UDF:

select dream(3);

+----------+
| DREAM(3) |
|----------|
| [        |
|   "Zzz", |
|   "Zzz", |
|   "Zzz"  |
| ]        |
+----------+
Copy

Angeben mehrerer Importdateien

Das folgende Beispiel zeigt, wie Sie mehrere Importdateien angeben können.

create or replace function multiple_import_files(s string)
returns string
language python
runtime_version=3.8
imports=('@python_udf_dep/bar/python_imports_a.zip', '@python_udf_dep/foo/python_imports_b.zip')
handler='compute'
as
$$
def compute(s):
  return s
$$;
Copy

Bemerkung

Die angegebenen Importdateinamen müssen unterschiedlich sein. Beispielsweise funktioniert Folgendes nicht: imports=('@python_udf_dep/bar/python_imports.zip', '@python_udf_dep/foo/python_imports.zip').

Erteilen von Berechtigungen für die Funktion

Damit eine andere Rolle als der Funktionseigentümer die Funktion aufrufen kann, muss der Eigentümer der Rolle entsprechende Berechtigungen erteilen.

Die GRANT-Anweisungen für ein Python-UDF sind im Wesentlichen identisch mit den GRANT-Anweisungen für andere UDFs, wie zum Beispiel JavaScript-UDFs.

Beispiel:

GRANT USAGE ON FUNCTION my_python_udf(number, number) TO my_role;
Copy

Beispiele

Verwenden eines importierten Pakets in einer Inline-Python-UDF

Eine kuratierte Liste von Drittanbieter-Paketen von Anaconda ist verfügbar. Weitere Informationen dazu finden Sie unter Verwenden von Drittanbieter-Paketen.

Bemerkung

Bevor Sie die von Anaconda bereitgestellten Pakete nutzen können, muss Ihr Snowflake-Organisationsadministrator die Snowflake-Bedingungen für Drittanbieter anerkennen. Weitere Informationen dazu finden Sie unter Erste Schritte.

Der folgende Code zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

Erstellen der UDF:

create or replace function py_udf()
returns variant
language python
runtime_version = 3.8
packages = ('numpy','pandas','xgboost==1.5.0')
handler = 'udf'
as $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
    return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

Aufrufen der UDF:

select py_udf();
Copy

Und hier ist die Ausgabe:

+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+
Copy

Laden einer Datei aus einem Stagingbereich in eine Python-UDF

Das folgende Beispiel zeigt, wie Nicht-Code-Dateien aus einem Stagingbereich in ein Python-UDF importiert werden. Die Datei wird nur einmal beim Erstellen der UDF gelesen. Sie wird bei Ausführung der UDF nicht noch einmal gelesen, wenn das Lesen der Datei außerhalb des Ziel-Handlers erfolgt. Weitere Informationen zum Lesen einer Datei finden Sie unter Lesen von Dateien mit einem UDF-Handler.

Bemerkung

Sie können nur Dateien aus dem obersten Verzeichnis im Stagingbereich importieren, nicht aus Unterordnern.

Erstellen der UDF:

create or replace function my_udf()
  returns string
  language python
  runtime_version=3.8
  imports=('@my_stage/file.txt')
  handler='compute'
as
$$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
    s = f.read()

def compute():
    return s
$$;
Copy

Entpacken einer Stagingdatei

Sie können eine ZIP-Datei in einem Stagingbereich speichern und sie dann mithilfe des Python-Moduls „zipfile“ in eine UDF entpacken.

Sie können z. B. eine ZIP-Datei in einen Stagingbereich hochladen und dann bei der Erstellung der UDF in der IMPORTS-Klausel auf die ZIP-Datei im Stagingbereich verweisen. Zur Laufzeit kopiert Snowflake die Stagingdatei in ein Importverzeichnis, von dem aus Ihr Code auf die Datei zugreifen kann.

Weitere Informationen zum Lesen und Schreiben von Dateien finden Sie unter Lesen und Schreiben von Dateien mit einem UDF-Handler.

Im folgenden Beispiel verwendet der UDF-Code ein NLP-Modell, um Entitäten im Text zu erkennen. Der Code gibt ein Array mit diesen Entitäten zurück. Um das NLP-Modell für die Verarbeitung des Textes einzurichten, verwendet der Code zunächst das „zipfile“-Modul, das die Datei für das Modell (en_core_web_sm-2.3.1) aus einer ZIP-Datei extrahiert. Der Code verwendet dann das „spaCy“-Modul, um das Modell aus der Datei zu laden.

Beachten Sie, dass der Code die extrahierten Dateiinhalte in das „/tmp“-Verzeichnis schreibt, das für die Abfrage erstellt wurde, die diese Funktion aufruft. Der Code verwendet Dateisperren, um sicherzustellen, dass die Extraktion über Python-Worker-Prozesse hinweg synchronisiert wird. Auf diese Weise wird der Inhalt nur einmal entpackt. Weiter Informationen zum Schreiben von Dateien finden Sie unter Schreiben von Dateien mit einem UDF-Handler.

Weitere Informationen zum „zipfile“-Modul finden Sie in der zipfile-Referenz. Weitere Informationen zum „spaCy“-Modul finden Sie in der spaCy-API-Dokumentation.

Erstellen der UDF:

create or replace function py_spacy(str string)
   returns array
   language python
   runtime_version = 3.8
   handler = 'func'
   packages = ('spacy')
   imports = ('@spacy_stage/spacy_en_core_web_sm.zip')
   as
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

# File lock class for synchronizing write access to /tmp
class FileLock:
   def __enter__(self):
      self._lock = threading.Lock()
      self._lock.acquire()
      self._fd = open('/tmp/lockfile.LOCK', 'w+')
      fcntl.lockf(self._fd, fcntl.LOCK_EX)

   def __exit__(self, type, value, traceback):
      self._fd.close()
      self._lock.release()

# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'

# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
   if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
      with zipfile.ZipFile(zip_file_path, 'r') as myzip:
         myzip.extractall(extracted)

# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

def func(text):
   doc = nlp(text)
   result = []

   for ent in doc.ents:
      result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
   return result
$$;
Copy

NULL-Verarbeitung in Python-UDFs

Der folgende Code zeigt, wie NULL-Werte verarbeitet werden. Weitere Informationen dazu finden Sie unter NULL-Werte.

Erstellen der UDF:

create or replace function py_udf_null(a variant)
returns string
language python
runtime_version = 3.8
handler = 'udf'
as $$

def udf(a):
    if not a:
        return 'JSON null'
    elif getattr(a, "is_sql_null", False):
        return 'SQL null'
    else:
        return 'not null'
$$;
Copy

Aufrufen der UDF:

select py_udf_null(null);
select py_udf_null(parse_json('null'));
select py_udf_null(10);
Copy

Und hier ist die Ausgabe:

+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+

+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+

+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+
Copy