Gespeicherte Prozeduren für DataFrames in Python erstellen¶
Die Snowpark-API bietet Methoden, mit denen Sie gespeicherte Prozeduren in Python erstellen können. Unter diesem Thema wird erklärt, wie Sie gespeicherte Prozeduren erstellen.
Unter diesem Thema:
Einführung¶
Mit Snowpark können Sie gespeicherte Prozeduren für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.
Sie können gespeicherte Prozeduren erstellen, die nur innerhalb der aktuellen Sitzung existieren (temporäre gespeicherte Prozeduren), sowie gespeicherte Prozeduren, die Sie in anderen Sitzungen verwenden können (permanente gespeicherte Prozeduren).
Verwenden von Paketen des Drittanbieters Anaconda in gespeicherten Prozeduren¶
Sie können beim Erstellen von gespeicherten Prozeduren in Python angeben, dass Anaconda-Pakete installiert werden sollen. Wenn Sie dann die gespeicherte Python-Prozedur in einem Snowflake-Warehouse aufrufen, werden die Anaconda-Pakete nahtlos installiert und im virtuellen Warehouse für Sie zwischengespeichert. Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.
Verwenden Sie session.add_packages
, um Pakete auf Sitzungsebene hinzuzufügen.
Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Sie können auch session.add_requirements
verwenden, um Pakete mit einer Anforderungsdatei anzugeben.
>>> session.add_requirements("mydir/requirements.txt")
Sie können die Pakete auf Ebene der gespeicherten Prozedur hinzufügen, um die Pakete auf Sitzungsebene zu überschreiben, die Sie möglicherweise zuvor hinzugefügt haben.
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
Wichtig
Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen von Abhängigkeiten die neueste Version. Wenn Sie die gespeicherte Prozedur in der Produktion einsetzen, sollten Sie jedoch sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre gespeicherte Prozeduren tun.
Wenn Sie eine permanente gespeicherte Prozedur erstellen, wird die gespeicherte Prozedur nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die gespeicherte Prozedur ausgeführt wird, verwendet sie immer die gleichen Abhängigkeitsversionen.
Wenn Sie eine temporäre gespeicherte Prozedur erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der gespeicherten Prozedur die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.
Erstellen einer anonymen gespeicherten Prozedur¶
Für das Erstellen einer anonymen gespeicherten Prozedur gibt es zwei Möglichkeiten:
Rufen Sie die Funktion
sproc
im Objektsnowflake.snowpark.functions
auf, und übergeben Sie die Definition der anonymen Funktion.Rufen Sie die Methode
register
in der KlasseStoredProcedureRegistration
auf, und übergeben Sie die Definition der anonymen Funktion. Um auf ein Attribut oder eine Methode der KlasseStoredProcedureRegistration
zuzugreifen, rufen Sie die Eigenschaftsproc
der KlasseSession
auf.
Das folgende Beispiel zeigt eine anonyme gespeicherte Prozedur:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Bemerkung
Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der gespeicherten Prozeduren die Methode register
und nicht die Funktion sproc
. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session
nicht gefunden werden kann.
Erstellen und Registrieren von benannten gespeicherten Prozeduren¶
Wenn Sie eine gespeicherte Prozedur über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call
im Objekt Session
), können Sie eine benannte gespeicherte Prozedur erstellen und registrieren. Es gibt zwei Möglichkeiten, die zu tun:
Rufen Sie die Funktion
sproc
im Modulsnowflake.snowpark.functions
auf, und übergeben Sie das Argumentname
und die Definition der anonymen Funktion.Rufen Sie die Methode
register
der KlasseStoredProcedureRegistration
auf, und übergeben Sie das Argumentname
und die Definition der anonymen Funktion. Um auf ein Attribut oder eine Methode der KlasseStoredProcedureRegistration
zuzugreifen, rufen Sie die Eigenschaftsproc
der KlasseSession
auf.
Durch Aufrufen von register
oder sproc
wird eine temporäre gespeicherte Prozedur erstellt, die Sie in der aktuellen Sitzung verwenden können.
Um eine permanente gespeicherte Prozedur zu erstellen, rufen Sie die Methode register
oder die Funktion sproc
auf, und setzen Sie das Argument is_permanent
auf True
. Wenn Sie eine permanente gespeicherte Prozedur erstellen, müssen Sie auch das Argument stage_location
auf den Speicherort des Stagingbereichs setzen, in den der von Snowpark verwendete Python-Konnektor die Python-Datei für die gespeicherte Prozedur und deren Abhängigkeiten hochlädt.
Das folgende Beispiel zeigt die Registrierung einer benannten temporären gespeicherten Prozedur:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Das folgende Beispiel zeigt, wie eine benannte permanente gespeicherte Prozedur registriert wird, indem das Argument is_permanent
auf True
gesetzt wird:
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
Das folgende Beispiel zeigt, wie diese gespeicherten Prozeduren aufgerufen werden:
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Lesen von Dateien mit einer gespeicherten Prozedur¶
Um den Inhalt einer Datei mit einer gespeicherten Prozedur zu lesen, können Sie Folgendes tun:
Lesen einer statisch spezifizierten Datei durch Importieren einer Datei und anschließendes Lesen aus dem Basisverzeichnis der gespeicherten Prozedur.
Lesen einer dynamisch spezifizierten Datei mit SnowflakeFile. Diese Vorgehensweise kann verwendet werden, wenn Sie während der Verarbeitung auf eine Datei zugreifen müssen.
Lesen von statisch spezifizierten Dateien¶
Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Dies geschieht auf die gleiche Weise wie bei UDFs. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.
Beispiel:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
Lesen der Datei in der gespeicherten Prozedur.
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile
¶
Mit der Klasse SnowflakeFile
des Snowpark-Moduls snowflake.snowpark.files
können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile
bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.
Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile
finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.
Im folgenden Beispiel wird eine permanente gespeicherte Prozedur erstellt, die mit SnowflakeFile
eine Datei aus einem Stagingbereich liest und die Dateilänge zurückgibt.
Erstellen Sie die gespeicherte Prozedur:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Rufen Sie die gespeicherte Prozedur auf:
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")