Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Python erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um einen UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem kundenspezifischen Code können Sie auch Module aus Python-Dateien oder Pakete von Drittanbietern importieren.

Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

In den nächsten Abschnitten wird erklärt, wie Sie diese UDFs erstellen.

Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).

Bemerkung

Es gibt eine Python-UDF-Batch-API, mit der Python-Funktionen definiert werden können, die Batches von Eingabezeilen als Pandas DataFrames empfangen. Die Batchschnittstelle führt zu einer wesentlichen Leistungssteigerung bei Szenarios mit Machine Learning-Inferenz. Weitere Informationen dazu finden Sie unter Verwenden von vektorisierten UDFs über die Python-UDF-Batch-API.

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.add_import() für alle Dateien aufrufen, die Module enthalten, von denen Ihre UDF abhängt (z. B. Python-Dateien, ZIP-Dateien, Ressourcendateien usw.). Sie können auch ein Verzeichnis angeben, das von der Snowpark-Bibliothek automatisch komprimiert und als ZIP-Datei hochgeladen wird. (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Erstellen einer UDF aus einer Python-Quelldatei.)

Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und importiert die Dateien beim Ausführen Ihrer UDF.

Das folgende Beispiel zeigt, wie Sie eine ZIP-Datei in einem Stagingbereich als Abhängigkeit hinzufügen:

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  

Die folgenden Beispiele zeigen, wie Sie eine Python-Datei von Ihrem lokalen Rechner hinzufügen:

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  

Die folgenden Beispiele zeigen, wie Sie andere Typen von Abhängigkeiten hinzufügen:

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  

Bemerkung

Die Python-Snowpark-Bibliothek wird nicht automatisch hochgeladen.

Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:

  • Ihre in Python integrierten Bibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

Verwenden von Paketen des Drittanbieters Anaconda in einer UDF

Sie können beim Erstellen von gespeicherten Python-UDFs angeben, dass Anaconda-Pakete installiert werden sollen. Wenn Sie dann in einem Snowflake-Warehouse Abfragen ausführen, die gespeicherte Python-UDFs aufrufen, werden die Anaconda-Pakete nahtlos installiert und in dem virtuellen Warehouse für Sie zwischengespeichert. Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.

Verwenden Sie session.add_packages, um Pakete auf Sitzungsebene hinzuzufügen.

Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]

Sie können auch session.add_requirements verwenden, um Pakete mit einer Anforderungsdatei anzugeben.

>>> session.add_requirements("mydir/requirements.txt")  

Sie können die Pakete auf UDF-Ebene hinzufügen, um Pakete auf Sitzungsebene zu überschreiben, die Sie zuvor hinzugefügt haben.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]

Wichtig

Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen von Abhängigkeiten die neueste Version. Wenn Sie die UDF in der Produktion einsetzen, sollten Sie jedoch sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre UDFs tun.

  • Wenn Sie eine permanente UDF erstellen, wird die UDF nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die UDF ausgeführt wird, verwendet sie immer die gleichen Abhängigkeitsversionen.

  • Wenn Sie eine temporäre UDF erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der UDF die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion udf im Objekt snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode register in der Klasse UDFRegistration auf, und übergeben Sie die Definition der anonymen Funktion.

Das folgende Beispiel zeigt eine anonyme UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])

Bemerkung

Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der UDFs die Methode register und nicht die Funktion udf. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call_udf im Objekt functions), können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie eine der folgenden Optionen:

  • Die Methode register der Klasse UDFRegistration mit dem Argument name.

  • Die Funktion udf im Modul snowflake.snowpark.functions mit dem Argument name.

Um auf ein Attribut oder eine Methode der Klasse UDFRegistration zuzugreifen, rufen Sie die Eigenschaft udf der Klasse Session auf.

Durch Aufrufen von register oder udf wird eine temporäre UDF erstellt, die Sie in der aktuellen Sitzung verwenden können.

Um eine permanente UDF zu erstellen, rufen Sie die Methode register oder die Funktion udf auf, und setzen Sie das Argument is_permanent auf True. Wenn Sie eine permanente UDF erstellen, müssen Sie auch das Argument stage_location auf den Speicherort des Stagingbereichs setzen, in den die Python-Datei für die UDF und deren Abhängigkeiten hochgeladen wird.

Das folgende Beispiel zeigt die Registrierung einer benannten temporären UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)

Das folgende Beispiel zeigt, wie eine benannte permanente UDF registriert wird, indem das Argument is_permanent auf True gesetzt wird:

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1

Das folgende Beispiel zeigt den Aufruf dieser UDFs:

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]

Erstellen einer UDF aus einer Python-Quelldatei

Sie können Ihren UDF-Handler auch in einer Python-Datei definieren und dann die Methode register_from_file der Klasse UDFRegistration verwenden, um einen UDF zu erstellen.

Hier sind Beispiele für die Verwendung von register_from_file.

Angenommen, Sie haben eine Python-Datei test_udf_file.py, die Folgendes enthält:

def mod5(x: int) -> int:
    return x % 5

Dann können Sie eine UDF über diese Funktion der Datei test_udf_file.py erstellen.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Sie können die Datei auch an den Speicherort eines Stagingbereichs hochladen und dann zur Erstellung der UDF verwenden.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Lesen von Dateien aus einer UDF

Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF Daten aus einer Datei lesen muss, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
  2. Lesen Sie der Datei in der UDF. Im folgenden Beispiel wird die Datei nur einmal beim Erstellen der UDF gelesen und nicht noch einmal bei Ausführung der UDF. Dies wird mit der Drittanbieter-Bibliothek cachetools erreicht.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    

Verwenden von vektorisierten UDFs über die Python-UDF-Batch-API

Die Python-UDF-Batch-API ermöglicht die Definition von Python-Funktionen, die Batches von Eingabezeilen als Pandas DataFrames erhalten und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben. Die Spalte im Snowpark-dataframe wird als Pandas Series innerhalb der UDF vektorisiert.

Im Folgenden finden Sie ein Beispiel für die Verwendung der Batchschnittstelle:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)

Vektorisierte Python-UDFs, die die Batch-API verwenden, werden genauso aufgerufen wie andere Python-UDFs. Weitere Informationen dazu finden Sie in unter Python-UDF-Batch-API, wo erklärt wird, wie eine vektorisierte UDF mithilfe einer SQL-Anweisung erstellt wird. So können Sie zum Beispiel beim Angeben des Python-Codes in der SQL-Anweisung das Decorator-Element vectorized verwenden. Bei Verwendung der in diesem Dokument beschriebenen Snowpark Python-API verwenden Sie zum Erstellen einer vektorisierten UDF keine SQL-Anweisung. Sie verwenden also nicht das Decorator-Element vectorized.

Die Anzahl der Zeilen pro Batch kann begrenzt werden. Weitere Informationen dazu finden Sie unter Festlegen einer Zielbatchgröße.

Weitere Erläuterungen und Beispiele zur Verwendung der Snowpark Python-API bei der Erstellung von vektorisierten UDFs finden Sie im UDFs-Abschnitt der Snowpark-API-Referenz.