Gespeicherte Prozeduren für DataFrames in Python erstellen

Die Snowpark-API bietet Methoden, mit denen Sie gespeicherte Prozeduren in Python erstellen können. Unter diesem Thema wird erklärt, wie Sie gespeicherte Prozeduren erstellen.

Unter diesem Thema:

Einführung

Mit Snowpark können Sie gespeicherte Prozeduren für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.

Sie können gespeicherte Prozeduren erstellen, die nur innerhalb der aktuellen Sitzung existieren (temporäre gespeicherte Prozeduren), sowie gespeicherte Prozeduren, die Sie in anderen Sitzungen verwenden können (permanente gespeicherte Prozeduren).

Verwenden von Paketen des Drittanbieters Anaconda in gespeicherten Prozeduren

Sie können beim Erstellen von gespeicherten Prozeduren in Python angeben, dass Anaconda-Pakete installiert werden sollen. Wenn Sie dann die gespeicherte Python-Prozedur in einem Snowflake-Warehouse aufrufen, werden die Anaconda-Pakete nahtlos installiert und im virtuellen Warehouse für Sie zwischengespeichert. Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.

Verwenden Sie session.add_packages, um Pakete auf Sitzungsebene hinzuzufügen.

Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")

>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
...   return [pd.__version__, xgb.__version__]
Copy

Sie können auch session.add_requirements verwenden, um Pakete mit einer Anforderungsdatei anzugeben.

>>> session.add_requirements("mydir/requirements.txt")
Copy

Sie können die Pakete auf Ebene der gespeicherten Prozedur hinzufügen, um die Pakete auf Sitzungsebene zu überschreiben, die Sie möglicherweise zuvor hinzugefügt haben.

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
...    return [pd.__version__, xgb.__version__]
Copy

Wichtig

Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen von Abhängigkeiten die neueste Version. Wenn Sie die gespeicherte Prozedur in der Produktion einsetzen, sollten Sie jedoch sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre gespeicherte Prozeduren tun.

  • Wenn Sie eine permanente gespeicherte Prozedur erstellen, wird die gespeicherte Prozedur nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die gespeicherte Prozedur ausgeführt wird, verwendet sie immer die gleichen Abhängigkeitsversionen.

  • Wenn Sie eine temporäre gespeicherte Prozedur erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der gespeicherten Prozedur die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.

Erstellen einer anonymen gespeicherten Prozedur

Für das Erstellen einer anonymen gespeicherten Prozedur gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion sproc im Objekt snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode register in der Klasse StoredProcedureRegistration auf, und übergeben Sie die Definition der anonymen Funktion. Um auf ein Attribut oder eine Methode der Klasse StoredProcedureRegistration zuzugreifen, rufen Sie die Eigenschaft sproc der Klasse Session auf.

Das folgende Beispiel zeigt eine anonyme gespeicherte Prozedur:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Copy

Bemerkung

Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der gespeicherten Prozeduren die Methode register und nicht die Funktion sproc. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Erstellen und Registrieren von benannten gespeicherten Prozeduren

Wenn Sie eine gespeicherte Prozedur über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call im Objekt Session), können Sie eine benannte gespeicherte Prozedur erstellen und registrieren. Es gibt zwei Möglichkeiten, die zu tun:

  • Rufen Sie die Funktion sproc im Modul snowflake.snowpark.functions auf, und übergeben Sie das Argument name und die Definition der anonymen Funktion.

  • Rufen Sie die Methode register der Klasse StoredProcedureRegistration auf, und übergeben Sie das Argument name und die Definition der anonymen Funktion. Um auf ein Attribut oder eine Methode der Klasse StoredProcedureRegistration zuzugreifen, rufen Sie die Eigenschaft sproc der Klasse Session auf.

Durch Aufrufen von register oder sproc wird eine temporäre gespeicherte Prozedur erstellt, die Sie in der aktuellen Sitzung verwenden können.

Um eine permanente gespeicherte Prozedur zu erstellen, rufen Sie die Methode register oder die Funktion sproc auf, und setzen Sie das Argument is_permanent auf True. Wenn Sie eine permanente gespeicherte Prozedur erstellen, müssen Sie auch das Argument stage_location auf den Speicherort des Stagingbereichs setzen, in den der von Snowpark verwendete Python-Konnektor die Python-Datei für die gespeicherte Prozedur und deren Abhängigkeiten hochlädt.

Das folgende Beispiel zeigt die Registrierung einer benannten temporären gespeicherten Prozedur:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Copy

Das folgende Beispiel zeigt, wie eine benannte permanente gespeicherte Prozedur registriert wird, indem das Argument is_permanent auf True gesetzt wird:

>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
...  return session.sql(f"select {x} - 1").collect()[0][0]
Copy

Das folgende Beispiel zeigt, wie diese gespeicherten Prozeduren aufgerufen werden:

>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

Lesen von Dateien mit einer gespeicherten Prozedur

Um den Inhalt einer Datei mit einer gespeicherten Prozedur zu lesen, können Sie Folgendes tun:

Lesen von statisch spezifizierten Dateien

  1. Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Dies geschieht auf die gleiche Weise wie bei UDFs. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. Lesen der Datei in der gespeicherten Prozedur.

    >>> def read_file(name: str) -> str:
    ...    import sys
    ...    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    ...    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    ...
    ...    with open(import_dir + 'my_file.txt', 'r') as file:
    ...        return file.read()
    
    Copy

Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile

Mit der Klasse SnowflakeFile des Snowpark-Moduls snowflake.snowpark.files können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.

Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.

Im folgenden Beispiel wird eine permanente gespeicherte Prozedur erstellt, die mit SnowflakeFile eine Datei aus einem Stagingbereich liest und die Dateilänge zurückgibt.

Erstellen Sie die gespeicherte Prozedur:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Rufen Sie die gespeicherte Prozedur auf:

file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")
Copy