Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python¶
Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Python erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.
Unter diesem Thema:
Einführung¶
Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.
Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.
In Ihrem kundenspezifischen Code können Sie auch Module aus Python-Dateien oder Pakete von Drittanbietern importieren.
Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:
Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.
Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.
In den nächsten Abschnitten wird erläutert, wie Sie diese UDFs mithilfe einer lokalen Entwicklungsumgebung oder mithilfe eines Python-Arbeitsblatts erstellen.
Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION
definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).
Bemerkung
Vektorisierte Python-UDFs ermöglichen die Definition von Python-Funktionen, die Batches von Eingabezeilen als Pandas DataFrames empfangen. Dies führt zu einer wesentlichen Leistungssteigerung bei Szenarios mit Machine Learning-Inferenz. Weitere Informationen dazu finden Sie unter Verwenden vektorisierter UDFs.
Bemerkung
Wenn Sie ein Python-Arbeitsblatt nutzen, verwenden Sie diese Beispiele innerhalb der Handler-Funktion:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
Wenn die Beispiele etwas anderes als einen DataFrame zurückgeben, z. B. eine Liste (list
) von Row
-Objekten, können Sie den Rückgabetyp ändern, sodass er dem Rückgabetyp des Beispiels entspricht.
Nachdem Sie ein Codebeispiel ausgeführt haben, verwenden Sie die Registerkarte Results, um alle zurückgegebenen Ausgaben anzuzeigen. Weitere Informationen dazu finden Sie unter Ausführen von Python-Arbeitsblättern.
Angeben von Abhängigkeiten für eine UDF¶
Um eine UDF über die Snowpark-API zu definieren, müssen Sie alle Dateien importieren, die Module enthalten, von denen Ihre UDF abhängt, wie Python-Dateien, ZIP-Dateien, Ressourcendateien usw.
Weitere Informationen zum Import mithilfe von Python-Arbeitsblättern finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.
Wenn Sie für den Import Ihre lokale Entwicklungsumgebung verwenden möchten, müssen Sie
Session.add_import()
in Ihrem Code aufrufen.
Sie können auch ein Verzeichnis angeben, das von der Snowpark-Bibliothek automatisch komprimiert und als ZIP-Datei hochgeladen wird. (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien mit einer UDF.)
Wenn Sie Session.add_import()
aufrufen, lädt die Snowpark-Bibliothek diese Dateien in einen internen Stagingbereich hoch und importiert die Dateien beim Ausführen Ihrer UDF.
Das folgende Beispiel zeigt, wie Sie eine in einem Stagingbereich befindliche ZIP-Datei als Abhängigkeit zu Ihrem Code hinzufügen:
>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")
Die folgenden Beispiele zeigen, wie Sie eine Python-Datei von Ihrem lokalen Rechner hinzufügen:
>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")
>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Die folgenden Beispiele zeigen, wie Sie andere Typen von Abhängigkeiten hinzufügen:
>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")
>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")
Bemerkung
Die Python-Snowpark-Bibliothek wird nicht automatisch hochgeladen.
Die folgenden Abhängigkeiten müssen nicht angegeben werden:
Ihre in Python integrierten Bibliotheken.
Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.
Verwenden von Paketen des Drittanbieters Anaconda in einer UDF¶
Sie können Pakete von Drittanbietern aus dem Snowflake-Anaconda-Kanal in einer UDF verwenden.
Wenn Sie eine Python-UDF in einem Python-Arbeitsblatt erstellen, sind die Anaconda-Pakete bereits für Ihr Arbeitsblatt verfügbar. Weitere Informationen dazu finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.
Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF erstellen, können Sie angeben, welche Anaconda-Pakete installiert werden sollen.
Wenn Sie dann in einem Snowflake-Warehouse Abfragen ausführen, die gespeicherte Python-UDFs aufrufen, werden die Anaconda-Pakete nahtlos installiert und in dem virtuellen Warehouse für Sie zwischengespeichert.
Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.
Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF schreiben, verwenden Sie session.add_packages
, um Pakete auf Sitzungsebene hinzuzufügen.
Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")
>>> @udf
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Sie können auch session.add_requirements
verwenden, um Pakete mit einer Anforderungsdatei anzugeben.
>>> session.add_requirements("mydir/requirements.txt")
Sie können die Pakete auf UDF-Ebene hinzufügen, um Pakete auf Sitzungsebene zu überschreiben, die Sie zuvor hinzugefügt haben.
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
Wichtig
Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen der Abhängigkeiten die neueste Version. Wenn Sie die UDF dann in der Produktionsumgebung bereitstellen, müssen Sie sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre UDFs tun.
Wenn Sie eine permanente UDF erstellen, wird die UDF nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die UDF ausgeführt wird, verwendet sie immer dieselben Abhängigkeitsversionen.
Wenn Sie eine temporäre UDF erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der UDF die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.
Erstellen einer anonymen UDF¶
Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:
Rufen Sie die Funktion
udf
im Objektsnowflake.snowpark.functions
auf, und übergeben Sie die Definition der anonymen Funktion.Rufen Sie die Methode
register
in der KlasseUDFRegistration
auf, und übergeben Sie die Definition der anonymen Funktion.
Das folgende Beispiel zeigt eine anonyme UDF:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Bemerkung
Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der UDFs die Methode register
und nicht die Funktion udf
. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session
nicht gefunden werden kann.
Erstellen und Registrieren einer benannten UDF¶
Wenn Sie eine UDF über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call_udf
im Objekt functions
), können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie eine der folgenden Optionen:
Die Methode
register
der KlasseUDFRegistration
mit dem Argumentname
.Die Funktion
udf
im Modulsnowflake.snowpark.functions
mit dem Argumentname
.
Um auf ein Attribut oder eine Methode der Klasse UDFRegistration
zuzugreifen, rufen Sie die Eigenschaft udf
der Klasse Session
auf.
Durch Aufrufen von register
oder udf
wird eine temporäre UDF erstellt, die Sie in der aktuellen Sitzung verwenden können.
Um eine permanente UDF zu erstellen, rufen Sie die Methode register
oder die Funktion udf
auf, und setzen Sie das Argument is_permanent
auf True
. Wenn Sie eine permanente UDF erstellen, müssen Sie auch das Argument stage_location
auf den Speicherort des Stagingbereichs setzen, in den die Python-Datei für die UDF und deren Abhängigkeiten hochgeladen wird.
Das folgende Beispiel zeigt die Registrierung einer benannten temporären UDF:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Das folgende Beispiel zeigt, wie eine benannte permanente UDF registriert wird, indem das Argument is_permanent
auf True
gesetzt wird:
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
... return x-1
Das folgende Beispiel zeigt den Aufruf dieser UDFs:
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Sie können die UDF auch mit SQL aufrufen:
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Erstellen einer UDF aus einer Python-Quelldatei¶
Wenn Sie die UDF in Ihrer lokalen Entwicklungsumgebung erstellen, können Sie Ihren UDF-Handler auch in einer Python-Datei definieren und dann die Methode register_from_file
der Klasse UDFRegistration
verwenden, um einen UDF zu erstellen.
Bemerkung
Sie können diese Methode nicht in einem Python-Arbeitsblatt verwenden.
Hier sind Beispiele für die Verwendung von register_from_file
.
Angenommen, Sie haben eine Python-Datei test_udf_file.py
, die Folgendes enthält:
def mod5(x: int) -> int:
return x % 5
Dann können Sie eine UDF über diese Funktion der Datei test_udf_file.py
erstellen.
>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
... file_path="tests/resources/test_udf_dir/test_udf_file.py",
... func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Sie können die Datei auch an den Speicherort eines Stagingbereichs hochladen und dann zur Erstellung der UDF verwenden.
>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
... file_path="@mystage/test_udf_file.py",
... func_name="mod5",
... return_type=IntegerType(),
... input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Lesen von Dateien mit einer UDF¶
Um den Inhalt einer Datei zu lesen, kann Ihr Python-Code Folgendes tun:
Lesen einer statisch spezifizierten Datei durch Importieren einer Datei und anschließendes Lesen aus dem Basisverzeichnis der UDF.
Lesen einer dynamisch spezifizierten Datei mit SnowflakeFile. Diese Vorgehensweise kann verwendet werden, wenn Sie während der Verarbeitung auf eine Datei zugreifen müssen.
Lesen von statisch spezifizierten Dateien¶
Die Snowpark-Bibliothek lädt die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF Daten aus einer Datei lesen soll, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.
Bemerkung
Wenn Sie Ihre UDF in ein Python-Arbeitsblatt schreiben, kann die UDF nur Dateien aus einem Stagingbereich lesen.
So richten Sie eine UDF zum Lesen einer Datei ein:
Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.
Beispiel:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
Lesen Sie der Datei in der UDF. Im folgenden Beispiel wird die Datei nur einmal beim Erstellen der UDF gelesen und nicht noch einmal bei Ausführung der UDF. Dies wird mit der Drittanbieter-Bibliothek cachetools erreicht.
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> >>> def add_suffix(s): ... return f"{read_file(os.path.basename(temp_file_name))}-{s}" >>> >>> concat_file_content_with_str_udf = session.udf.register( ... add_suffix, ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile
¶
Mit der Klasse SnowflakeFile
des Snowpark-Moduls snowflake.snowpark.files
können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile
bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.
Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile
finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.
Im folgenden Beispiel wird eine temporäre UDF registriert, die mit SnowflakeFile
eine Textdatei aus einem Stagingbereich liest und die Dateilänge zurückgibt.
Registrieren der UDF:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Aufrufen der UDF:
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Verwenden vektorisierter UDFs¶
Mit vektorisierten Python-UDFs können Sie Python-Funktionen definieren, mit denen Batches von Eingabezeilen als Pandas DataFrames empfangen und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben werden. Die Spalte im Snowpark-dataframe
wird als Pandas Series innerhalb der UDF vektorisiert.
Im Folgenden finden Sie ein Beispiel für die Verwendung der Batchschnittstelle:
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
Vektorisierte Python-UDFs werden genauso aufgerufen wie andere Python-UDFs. Weitere Informationen dazu finden Sie in unter Vektorisierte Python-UDFs, wo erklärt wird, wie eine vektorisierte UDF mithilfe einer SQL-Anweisung erstellt wird. So können Sie zum Beispiel beim Angeben des Python-Codes in der SQL-Anweisung das Decorator-Element vectorized
verwenden. Bei Verwendung der in diesem Dokument beschriebenen Snowpark Python-API verwenden Sie zum Erstellen einer vektorisierten UDF keine SQL-Anweisung. Sie verwenden also nicht das Decorator-Element vectorized
.
Die Anzahl der Zeilen pro Batch kann begrenzt werden. Weitere Informationen dazu finden Sie unter Festlegen einer Zielbatchgröße.
Weitere Erläuterungen und Beispiele zur Verwendung der Snowpark Python-API bei der Erstellung von vektorisierten UDFs finden Sie im UDFs-Abschnitt der Snowpark-API-Referenz.