Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Python erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem kundenspezifischen Code können Sie auch Module aus Python-Dateien oder Pakete von Drittanbietern importieren.

Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

In den nächsten Abschnitten wird erläutert, wie Sie diese UDFs mithilfe einer lokalen Entwicklungsumgebung oder mithilfe eines Python-Arbeitsblatts erstellen.

Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).

Bemerkung

Vektorisierte Python-UDFs ermöglichen die Definition von Python-Funktionen, die Batches von Eingabezeilen als Pandas DataFrames empfangen. Dies führt zu einer wesentlichen Leistungssteigerung bei Szenarios mit Machine Learning-Inferenz. Weitere Informationen dazu finden Sie unter Verwenden vektorisierter UDFs.

Bemerkung

Wenn Sie ein Python-Arbeitsblatt nutzen, verwenden Sie diese Beispiele innerhalb der Handler-Funktion:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

Wenn die Beispiele etwas anderes als einen DataFrame zurückgeben, z. B. eine Liste (list) von Row-Objekten, können Sie den Rückgabetyp ändern, sodass er dem Rückgabetyp des Beispiels entspricht.

Nachdem Sie ein Codebeispiel ausgeführt haben, verwenden Sie die Registerkarte Results, um alle zurückgegebenen Ausgaben anzuzeigen. Weitere Informationen dazu finden Sie unter Ausführen von Python-Arbeitsblättern.

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie alle Dateien importieren, die Module enthalten, von denen Ihre UDF abhängt, wie Python-Dateien, ZIP-Dateien, Ressourcendateien usw.

Sie können auch ein Verzeichnis angeben, das von der Snowpark-Bibliothek automatisch komprimiert und als ZIP-Datei hochgeladen wird. (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien mit einer UDF.)

Wenn Sie Session.add_import() aufrufen, lädt die Snowpark-Bibliothek diese Dateien in einen internen Stagingbereich hoch und importiert die Dateien beim Ausführen Ihrer UDF.

Das folgende Beispiel zeigt, wie Sie eine in einem Stagingbereich befindliche ZIP-Datei als Abhängigkeit zu Ihrem Code hinzufügen:

>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")  
Copy

Die folgenden Beispiele zeigen, wie Sie eine Python-Datei von Ihrem lokalen Rechner hinzufügen:

>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")  

>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")  
Copy

Die folgenden Beispiele zeigen, wie Sie andere Typen von Abhängigkeiten hinzufügen:

>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")  

>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")  
Copy

Bemerkung

Die Python-Snowpark-Bibliothek wird nicht automatisch hochgeladen.

Die folgenden Abhängigkeiten müssen nicht angegeben werden:

  • Ihre in Python integrierten Bibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

Verwenden von Paketen des Drittanbieters Anaconda in einer UDF

Sie können Pakete von Drittanbietern aus dem Snowflake-Anaconda-Kanal in einer UDF verwenden.

  • Wenn Sie eine Python-UDF in einem Python-Arbeitsblatt erstellen, sind die Anaconda-Pakete bereits für Ihr Arbeitsblatt verfügbar. Weitere Informationen dazu finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.

  • Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF erstellen, können Sie angeben, welche Anaconda-Pakete installiert werden sollen.

Wenn Sie dann in einem Snowflake-Warehouse Abfragen ausführen, die gespeicherte Python-UDFs aufrufen, werden die Anaconda-Pakete nahtlos installiert und in dem virtuellen Warehouse für Sie zwischengespeichert.

Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.

Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF schreiben, verwenden Sie session.add_packages, um Pakete auf Sitzungsebene hinzuzufügen.

Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")

>>> @udf
... def compute() -> list:
...    return [np.__version__, pd.__version__, xgb.__version__]
Copy

Sie können auch session.add_requirements verwenden, um Pakete mit einer Anforderungsdatei anzugeben.

>>> session.add_requirements("mydir/requirements.txt")  
Copy

Sie können die Pakete auf UDF-Ebene hinzufügen, um Pakete auf Sitzungsebene zu überschreiben, die Sie zuvor hinzugefügt haben.

>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf

>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
...     return [np.__version__, pd.__version__, xgb.__version__]
Copy

Wichtig

Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen der Abhängigkeiten die neueste Version. Wenn Sie die UDF dann in der Produktionsumgebung bereitstellen, müssen Sie sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre UDFs tun.

  • Wenn Sie eine permanente UDF erstellen, wird die UDF nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die UDF ausgeführt wird, verwendet sie immer dieselben Abhängigkeitsversionen.

  • Wenn Sie eine temporäre UDF erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der UDF die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion udf im Objekt snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode register in der Klasse UDFRegistration auf, und übergeben Sie die Definition der anonymen Funktion.

Das folgende Beispiel zeigt eine anonyme UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Bemerkung

Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der UDFs die Methode register und nicht die Funktion udf. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call_udf im Objekt functions), können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie eine der folgenden Optionen:

  • Die Methode register der Klasse UDFRegistration mit dem Argument name.

  • Die Funktion udf im Modul snowflake.snowpark.functions mit dem Argument name.

Um auf ein Attribut oder eine Methode der Klasse UDFRegistration zuzugreifen, rufen Sie die Eigenschaft udf der Klasse Session auf.

Durch Aufrufen von register oder udf wird eine temporäre UDF erstellt, die Sie in der aktuellen Sitzung verwenden können.

Um eine permanente UDF zu erstellen, rufen Sie die Methode register oder die Funktion udf auf, und setzen Sie das Argument is_permanent auf True. Wenn Sie eine permanente UDF erstellen, müssen Sie auch das Argument stage_location auf den Speicherort des Stagingbereichs setzen, in den die Python-Datei für die UDF und deren Abhängigkeiten hochgeladen wird.

Das folgende Beispiel zeigt die Registrierung einer benannten temporären UDF:

>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf

>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Das folgende Beispiel zeigt, wie eine benannte permanente UDF registriert wird, indem das Argument is_permanent auf True gesetzt wird:

>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
...   return x-1
Copy

Das folgende Beispiel zeigt den Aufruf dieser UDFs:

>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Copy

Sie können die UDF auch mit SQL aufrufen:

>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Erstellen einer UDF aus einer Python-Quelldatei

Wenn Sie die UDF in Ihrer lokalen Entwicklungsumgebung erstellen, können Sie Ihren UDF-Handler auch in einer Python-Datei definieren und dann die Methode register_from_file der Klasse UDFRegistration verwenden, um einen UDF zu erstellen.

Bemerkung

Sie können diese Methode nicht in einem Python-Arbeitsblatt verwenden.

Hier sind Beispiele für die Verwendung von register_from_file.

Angenommen, Sie haben eine Python-Datei test_udf_file.py, die Folgendes enthält:

def mod5(x: int) -> int:
    return x % 5
Copy

Dann können Sie eine UDF über diese Funktion der Datei test_udf_file.py erstellen.

>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
...     file_path="tests/resources/test_udf_dir/test_udf_file.py",
...     func_name="mod5",
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Sie können die Datei auch an den Speicherort eines Stagingbereichs hochladen und dann zur Erstellung der UDF verwenden.

>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
...     file_path="@mystage/test_udf_file.py",
...     func_name="mod5",
...     return_type=IntegerType(),
...     input_types=[IntegerType()],
... )  
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Copy

Lesen von Dateien mit einer UDF

Um den Inhalt einer Datei zu lesen, kann Ihr Python-Code Folgendes tun:

Lesen von statisch spezifizierten Dateien

Die Snowpark-Bibliothek lädt die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF Daten aus einer Datei lesen soll, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

Bemerkung

Wenn Sie Ihre UDF in ein Python-Arbeitsblatt schreiben, kann die UDF nur Dateien aus einem Stagingbereich lesen.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")  
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")  
    
    Copy
  2. Lesen Sie der Datei in der UDF. Im folgenden Beispiel wird die Datei nur einmal beim Erstellen der UDF gelesen und nicht noch einmal bei Ausführung der UDF. Dies wird mit der Drittanbieter-Bibliothek cachetools erreicht.

    >>> import sys
    >>> import os
    >>> import cachetools
    >>> from snowflake.snowpark.types import StringType
    >>> @cachetools.cached(cache={})
    ... def read_file(filename):
    ...     import_dir = sys._xoptions.get("snowflake_import_directory")
    ...     if import_dir:
    ...         with open(os.path.join(import_dir, filename), "r") as f:
    ...             return f.read()
    >>>
    >>> # create a temporary text file for test
    >>> temp_file_name = "/tmp/temp.txt"
    >>> with open(temp_file_name, "w") as t:
    ...     _ = t.write("snowpark")
    >>> session.add_import(temp_file_name)
    >>> session.add_packages("cachetools")
    >>>
    >>> def add_suffix(s):
    ...     return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    >>>
    >>> concat_file_content_with_str_udf = session.udf.register(
    ...     add_suffix,
    ...     return_type=StringType(),
    ...     input_types=[StringType()]
    ... )
    >>>
    >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    >>> os.remove(temp_file_name)
    >>> session.clear_imports()
    
    Copy

Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile

Mit der Klasse SnowflakeFile des Snowpark-Moduls snowflake.snowpark.files können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.

Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.

Im folgenden Beispiel wird eine temporäre UDF registriert, die mit SnowflakeFile eine Textdatei aus einem Stagingbereich liest und die Dateilänge zurückgibt.

Registrieren der UDF:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Aufrufen der UDF:

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Verwenden vektorisierter UDFs

Mit vektorisierten Python-UDFs können Sie Python-Funktionen definieren, mit denen Batches von Eingabezeilen als Pandas DataFrames empfangen und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben werden. Die Spalte im Snowpark-dataframe wird als Pandas Series innerhalb der UDF vektorisiert.

Im Folgenden finden Sie ein Beispiel für die Verwendung der Batchschnittstelle:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

Vektorisierte Python-UDFs werden genauso aufgerufen wie andere Python-UDFs. Weitere Informationen dazu finden Sie in unter Vektorisierte Python-UDFs, wo erklärt wird, wie eine vektorisierte UDF mithilfe einer SQL-Anweisung erstellt wird. So können Sie zum Beispiel beim Angeben des Python-Codes in der SQL-Anweisung das Decorator-Element vectorized verwenden. Bei Verwendung der in diesem Dokument beschriebenen Snowpark Python-API verwenden Sie zum Erstellen einer vektorisierten UDF keine SQL-Anweisung. Sie verwenden also nicht das Decorator-Element vectorized.

Die Anzahl der Zeilen pro Batch kann begrenzt werden. Weitere Informationen dazu finden Sie unter Festlegen einer Zielbatchgröße.

Weitere Erläuterungen und Beispiele zur Verwendung der Snowpark Python-API bei der Erstellung von vektorisierten UDFs finden Sie im UDFs-Abschnitt der Snowpark-API-Referenz.