Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python¶
Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Python erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.
Unter diesem Thema:
Einführung¶
Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.
Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.
In Ihrem kundenspezifischen Code können Sie auch Module aus Python-Dateien oder Pakete von Drittanbietern importieren.
Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:
Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.
Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.
In den nächsten Abschnitten wird erläutert, wie Sie diese UDFs mithilfe einer lokalen Entwicklungsumgebung oder mithilfe eines Python-Arbeitsblatts erstellen.
Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION
definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).
Bemerkung
Vektorisierte Python-UDFs ermöglichen die Definition von Python-Funktionen, die Batches von Eingabezeilen als Pandas DataFrames empfangen. Dies führt zu einer wesentlichen Leistungssteigerung bei Szenarios mit Machine Learning-Inferenz. Weitere Informationen dazu finden Sie unter Verwenden vektorisierter UDFs.
Bemerkung
Wenn Sie ein Python-Arbeitsblatt nutzen, verwenden Sie diese Beispiele innerhalb der Handler-Funktion:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
Wenn die Beispiele etwas anderes als einen DataFrame zurückgeben, z. B. eine Liste (list
) von Row
-Objekten, können Sie den Rückgabetyp ändern, sodass er dem Rückgabetyp des Beispiels entspricht.
Nachdem Sie ein Codebeispiel ausgeführt haben, verwenden Sie die Registerkarte Results, um alle zurückgegebenen Ausgaben anzuzeigen. Weitere Informationen dazu finden Sie unter Ausführen von Python-Arbeitsblättern.
Angeben von Abhängigkeiten für eine UDF¶
Um eine UDF über die Snowpark-API zu definieren, müssen Sie alle Dateien importieren, die Module enthalten, von denen Ihre UDF abhängt, wie Python-Dateien, ZIP-Dateien, Ressourcendateien usw.
Weitere Informationen zum Import mithilfe von Python-Arbeitsblättern finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.
Wenn Sie für den Import Ihre lokale Entwicklungsumgebung verwenden möchten, müssen Sie
Session.add_import()
in Ihrem Code aufrufen.
Sie können auch ein Verzeichnis angeben, das von der Snowpark-Bibliothek automatisch komprimiert und als ZIP-Datei hochgeladen wird. (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien mit einer UDF.)
Wenn Sie Session.add_import()
aufrufen, lädt die Snowpark-Bibliothek diese Dateien in einen internen Stagingbereich hoch und importiert die Dateien beim Ausführen Ihrer UDF.
Das folgende Beispiel zeigt, wie Sie eine in einem Stagingbereich befindliche ZIP-Datei als Abhängigkeit zu Ihrem Code hinzufügen:
# Add a zip file that you uploaded to a stage.
session.add_import("@my_stage/<path>/my_library.zip")
Die folgenden Beispiele zeigen, wie Sie eine Python-Datei von Ihrem lokalen Rechner hinzufügen:
# Import a Python file from your local machine.
session.add_import("/<path>/my_module.py")
# Import a Python file from your local machine and specify a relative Python import path.
session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Die folgenden Beispiele zeigen, wie Sie andere Typen von Abhängigkeiten hinzufügen:
# Add a directory of resource files.
session.add_import("/<path>/my-resource-dir/")
# Add a resource file.
session.add_import("/<path>/my-resource.xml")
Bemerkung
Die Python-Snowpark-Bibliothek wird nicht automatisch hochgeladen.
Die folgenden Abhängigkeiten müssen nicht angegeben werden:
Ihre in Python integrierten Bibliotheken.
Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.
Verwenden von Paketen des Drittanbieters Anaconda in einer UDF¶
Sie können Pakete von Drittanbietern aus dem Snowflake-Anaconda-Kanal in einer UDF verwenden.
Wenn Sie eine Python-UDF in einem Python-Arbeitsblatt erstellen, sind die Anaconda-Pakete bereits für Ihr Arbeitsblatt verfügbar. Weitere Informationen dazu finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.
Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF erstellen, können Sie angeben, welche Anaconda-Pakete installiert werden sollen.
Wenn Sie dann in einem Snowflake-Warehouse Abfragen ausführen, die gespeicherte Python-UDFs aufrufen, werden die Anaconda-Pakete nahtlos installiert und in dem virtuellen Warehouse für Sie zwischengespeichert.
Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.
Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF schreiben, verwenden Sie session.add_packages
, um Pakete auf Sitzungsebene hinzuzufügen.
Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.
import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf
session.add_packages("numpy", "pandas", "xgboost==1.5.0")
@udf
def compute() -> list:
return [np.__version__, pd.__version__, xgb.__version__]
Sie können auch session.add_requirements
verwenden, um Pakete mit einer Anforderungsdatei anzugeben.
session.add_requirements("mydir/requirements.txt")
Sie können die Pakete auf UDF-Ebene hinzufügen, um Pakete auf Sitzungsebene zu überschreiben, die Sie zuvor hinzugefügt haben.
import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf
@udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
def compute() -> list:
return [np.__version__, pd.__version__, xgb.__version__]
Wichtig
Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen der Abhängigkeiten die neueste Version. Wenn Sie die UDF dann in der Produktionsumgebung bereitstellen, müssen Sie sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre UDFs tun.
Wenn Sie eine permanente UDF erstellen, wird die UDF nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die UDF ausgeführt wird, verwendet sie immer dieselben Abhängigkeitsversionen.
Wenn Sie eine temporäre UDF erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der UDF die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.
Erstellen einer anonymen UDF¶
Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:
Rufen Sie die Funktion
udf
im Objektsnowflake.snowpark.functions
auf, und übergeben Sie die Definition der anonymen Funktion.Rufen Sie die Methode
register
in der KlasseUDFRegistration
auf, und übergeben Sie die Definition der anonymen Funktion.
Das folgende Beispiel zeigt eine anonyme UDF:
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Bemerkung
Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der UDFs die Methode register
und nicht die Funktion udf
. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session
nicht gefunden werden kann.
Erstellen und Registrieren einer benannten UDF¶
Wenn Sie eine UDF über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call_udf
im Objekt functions
), können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie eine der folgenden Optionen:
Die Methode
register
der KlasseUDFRegistration
mit dem Argumentname
.Die Funktion
udf
im Modulsnowflake.snowpark.functions
mit dem Argumentname
.
Um auf ein Attribut oder eine Methode der Klasse UDFRegistration
zuzugreifen, rufen Sie die Eigenschaft udf
der Klasse Session
auf.
Durch Aufrufen von register
oder udf
wird eine temporäre UDF erstellt, die Sie in der aktuellen Sitzung verwenden können.
Um eine permanente UDF zu erstellen, rufen Sie die Methode register
oder die Funktion udf
auf, und setzen Sie das Argument is_permanent
auf True
. Wenn Sie eine permanente UDF erstellen, müssen Sie auch das Argument stage_location
auf den Speicherort des Stagingbereichs setzen, in den die Python-Datei für die UDF und deren Abhängigkeiten hochgeladen wird.
Das folgende Beispiel zeigt die Registrierung einer benannten temporären UDF:
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Das folgende Beispiel zeigt, wie eine benannte permanente UDF registriert wird, indem das Argument is_permanent
auf True
gesetzt wird:
@udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
def minus_one(x: int) -> int:
return x-1
Das folgende Beispiel zeigt den Aufruf dieser UDFs:
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
Sie können die UDF auch mit SQL aufrufen:
session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Erstellen einer UDF aus einer Python-Quelldatei¶
Wenn Sie die UDF in Ihrer lokalen Entwicklungsumgebung erstellen, können Sie Ihren UDF-Handler auch in einer Python-Datei definieren und dann die Methode register_from_file
der Klasse UDFRegistration
verwenden, um einen UDF zu erstellen.
Bemerkung
Sie können diese Methode nicht in einem Python-Arbeitsblatt verwenden.
Hier sind Beispiele für die Verwendung von register_from_file
.
Angenommen, Sie haben eine Python-Datei test_udf_file.py
, die Folgendes enthält:
def mod5(x: int) -> int:
return x % 5
Dann können Sie eine UDF über diese Funktion der Datei test_udf_file.py
erstellen.
# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
file_path="tests/resources/test_udf_dir/test_udf_file.py",
func_name="mod5",
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Sie können die Datei auch an den Speicherort eines Stagingbereichs hochladen und dann zur Erstellung der UDF verwenden.
from snowflake.snowpark.types import IntegerType
# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
file_path="@mystage/test_udf_file.py",
func_name="mod5",
return_type=IntegerType(),
input_types=[IntegerType()],
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
Lesen von Dateien mit einer UDF¶
Um den Inhalt einer Datei zu lesen, kann Ihr Python-Code Folgendes tun:
Lesen einer statisch spezifizierten Datei durch Importieren einer Datei und anschließendes Lesen aus dem Basisverzeichnis der UDF.
Lesen einer dynamisch spezifizierten Datei mit SnowflakeFile. Diese Vorgehensweise kann verwendet werden, wenn Sie während der Verarbeitung auf eine Datei zugreifen müssen.
Lesen von statisch spezifizierten Dateien¶
Die Snowpark-Bibliothek lädt die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF Daten aus einer Datei lesen soll, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.
Bemerkung
Wenn Sie Ihre UDF in ein Python-Arbeitsblatt schreiben, kann die UDF nur Dateien aus einem Stagingbereich lesen.
So richten Sie eine UDF zum Lesen einer Datei ein:
Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.
Beispiel:
# Import a file from your local machine as a dependency. session.add_import("/<path>/my_file.txt") # Or import a file that you uploaded to a stage as a dependency. session.add_import("@my_stage/<path>/my_file.txt")
Lesen Sie der Datei in der UDF. Im folgenden Beispiel wird die Datei nur einmal beim Erstellen der UDF gelesen und nicht noch einmal bei Ausführung der UDF. Dies wird mit der Drittanbieter-Bibliothek cachetools erreicht.
import sys import os import cachetools from snowflake.snowpark.types import StringType @cachetools.cached(cache={}) def read_file(filename): import_dir = sys._xoptions.get("snowflake_import_directory") if import_dir: with open(os.path.join(import_dir, filename), "r") as f: return f.read() # create a temporary text file for test temp_file_name = "/tmp/temp.txt" with open(temp_file_name, "w") as t: _ = t.write("snowpark") session.add_import(temp_file_name) session.add_packages("cachetools") def add_suffix(s): return f"{read_file(os.path.basename(temp_file_name))}-{s}" concat_file_content_with_str_udf = session.udf.register( add_suffix, return_type=StringType(), input_types=[StringType()] ) df = session.create_dataframe(["snowflake", "python"], schema=["a"]) df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
[Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
os.remove(temp_file_name) session.clear_imports()
Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile
¶
Mit der Klasse SnowflakeFile
des Snowpark-Moduls snowflake.snowpark.files
können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile
bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.
Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile
finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.
Im folgenden Beispiel wird eine temporäre UDF registriert, die mit SnowflakeFile
eine Textdatei aus einem Stagingbereich liest und die Dateilänge zurückgibt.
Registrieren der UDF:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
Aufrufen der UDF:
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Schreiben von Dateien aus Snowpark Python UDFs und UDTFs¶
Mit Snowpark Python können Sie Dateien in Stagingbereiche mit benutzerdefinierten Funktionen (UDFs), vektorisierten UDFs, benutzerdefinierten Tabellenfunktionen (UDTFs) und vektorisierten UDTFs schreiben. Im Funktionshandler verwenden Sie die SnowflakeFile API, um Dateien zu öffnen und zu schreiben. Wenn Sie die Datei aus der Funktion zurückgeben, wird die Datei zusammen mit den Abfrageergebnissen geschrieben.
Eine einfache UDF zum Schreiben einer Datei könnte so aussehen:
CREATE OR REPLACE FUNCTION write_file()
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'write_file'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
def write_file():
file = SnowflakeFile.open_new_result("w") # Open a new result file
file.write("Hello world") # Write data
return file # File must be returned
$$;
Wenn Sie diese UDF ausführen, erhalten Sie eine Bereichs-URL, die auf die Ergebnisdatei verweist.
Zugriff auf die Ergebnisdateien¶
Ein Dateihandler wird als Bereichs-URL an die Abfrage zurückgegeben, die UDF aufruft. Sie können diese spezielle URL verwenden, um von Snowflake aus auf Dateien zuzugreifen (über eine andere UDF oder den Befehl COPY FILES), aber nicht von außerhalb von Snowflake als vorsignierte URL. Die Bereichs-URL ist 24 Stunden lang gültig.
Nachdem eine Datei von einem UDF zurückgegeben wurde, können Sie je nach Anwendungsfall mit einem der folgenden Speichertools darauf zugreifen:
COPY FILES: Kopieren Sie die Datei in einen anderen Stagingbereich. Nachdem die Datei kopiert wurde, können Sie sie wie eine typische Stagingdatei verwenden, z. B. mit den folgenden Tools:
Verzeichnistabellen: Abfrage einer Liste von Dateien in einem Stagingbereich mit einer WHERE-Klausel, um bei Bedarf zu filtern.
GET_PRESIGNED_URL: Erzeugen Sie eine URL zu @stage/file.
Externe Stagingbereiche: Greifen Sie außerhalb von Snowflake über Cloudanbieter-APIs auf die Datei zu.
UDF: Lesen Sie die Datei in einer anderen Abfrage.
Weitere Informationen dazu finden Sie unter COPY FILES.
Einschränkungen¶
Dieses Feature ist nicht für Java oder Scala verfügbar.
Gespeicherte Prozeduren unterstützen auch das Schreiben von Dateien, können aber nicht einfach mit einem COPY FILES-Befehl verkettet werden. Daher empfehlen wir für das Schreiben von Dateien mithilfe von gespeicherten Prozeduren den Datei-Staging-Befehl PUT zu verwenden.
Beispiele¶
Dieser Abschnitt enthält Codebeispiele, die zeigen, wie Sie Dateien für verschiedene Anwendungsfälle in den Stagingbereich schreiben.
Dateitransformation¶
Im Folgenden finden Sie ein Beispiel für einen UDF-Handler, der eine Datei umwandelt. Sie können dieses Beispiel abändern, um verschiedene Arten von Dateiumwandlungen vorzunehmen, wie z. B.:
Konvertieren von einem Dateiformat in ein anderes Format.
Ein Bild neu skalieren.
Transformieren von Dateien in einen „goldenen Zustand“ in einem Ordner mit Zeitstempel im selben oder einem anderen Bucket.
CREATE OR REPLACE FUNCTION convert_to_foo(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'convert_to_foo'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
def convert_to_foo(filename):
input_file = SnowflakeFile.open(filename, "r")
converted_file = SnowflakeFile.open_new_result("w")
# Foo-type is just adding foo at the end of every line
for line in input_file.readlines():
converted_file.write(line[:-1] + 'foo' + '\n')
return converted_file
$$;
Sie können diese UDF in einer Abfrage aufrufen und dann auf die converted_file
-Ergebnisdatei zugreifen, die von der UDF geschrieben wurde.
Die folgenden SQL-Beispiele zeigen, was Sie mit den von UDFs zurückgegebenen Ergebnisdateien tun können, z. B. sie in einen Stagingbereich kopieren oder sie in einer nachfolgenden Abfrage oder einer anderen UDF konsumieren. Diese grundlegenden SQL-Muster sind auf alle UDF Schreibbeispiele anwendbar, die in diesem Thema enthalten sind. Sie können zum Beispiel die vorzeichenbehaftete Abfrage-URL für jedes der folgenden UDF-Beispiele verwenden, indem Sie sie anstelle einer anderen SQL-Anweisung einsetzen.
Beispiel 1: Eine einzelne Datei konvertieren und in einen endgültigen Stagingbereich kopieren¶
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, 'in.txt')), 'out.foo.txt');
Beispiel 2: Eine Tabelle mit Dateien konvertieren und in einen endgültigen Stagingbereich kopieren¶
CREATE TABLE files_to_convert(file string);
-- Populate files_to_convert with input files:
INSERT INTO files_to_convert VALUES ('file1.txt');
INSERT INTO files_to_convert VALUES ('file2.txt');
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)),
REPLACE(file, '.txt', '.foo.txt') FROM files_to_convert);
Beispiel 3: Alle Dateien konvertieren und in einen endgültigen Stagingbereich kopieren¶
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, RELATIVE_PATH)),
REPLACE(RELATIVE_PATH, 'format1', 'format2') FROM DIRECTORY(@input_stage));
Beispiel 4: Alle Dateien aus einer Tabelle konvertieren und ohne Kopieren lesen¶
-- A basic UDF to read a file:
CREATE OR REPLACE FUNCTION read_udf(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'read'
AS
'
from snowflake.snowpark.files import SnowflakeFile
def read(filename):
return SnowflakeFile.open(filename, "r").read()
';
-- Create files_to_convert as in Example 2.
SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) as new_file
FROM files_to_convert;
-- The following query must be run within 24 hours from the prior one
SELECT read_udf(new_file) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
Beispiel 5: Konvertieren Sie alle Dateien aus einer Tabelle und lesen Sie sie sofort über eine UDF¶
-- Set up files_to_convert as in Example 2.
-- Set up read_udf as in Example 4.
SELECT read_udf(
convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file))) FROM files_to_convert;
Beispiel 6: Lesen mit vorzeichenbehafteten URLs¶
Dieses Beispiel gilt nur für Stagingbereiche mit serverseitiger Verschlüsselung. Interne Stagingbereiche verfügen standardmäßig über eine clientseitige Verschlüsselung.
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) FROM files_to_convert);
-- Refresh the directory to get new files in output_stage.
ALTER STAGE output_stage REFRESH;
Eine PDF aus einer Partition von Tabellendaten erstellen und an einen endgültigen Speicherort kopieren¶
Das folgende UDF-Handler-Beispiel partitioniert die Eingabedaten und schreibt einen PDF-Bericht für jede Partition der Daten. In diesem Beispiel werden Berichte nach der Zeichenfolge location
partitioniert.
Sie können dieses Beispiel auch ändern, um andere Dateitypen wie ML-Modelle und andere benutzerdefinierte Formate für jede Partition zu schreiben.
-- Create a stage that includes the font (for PDF creation)
CREATE OR REPLACE STAGE fonts
URL = 's3://sfquickstarts/misc/';
-- UDF to write the data
CREATE OR REPLACE FUNCTION create_report_pdf(data string)
RETURNS TABLE (file string)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER='CreateReport'
PACKAGES = ('snowflake-snowpark-python', 'fpdf')
IMPORTS = ('@fonts/DejaVuSans.ttf')
AS $$
from snowflake.snowpark.files import SnowflakeFile
from fpdf import FPDF
import shutil
import sys
import uuid
import_dir = sys._xoptions["snowflake_import_directory"]
class CreateReport:
def __init__(self):
self.pdf = None
def process(self, data):
if self.pdf == None:
# PDF library edits this file, make sure it's unique.
font_file = f'/tmp/DejaVuSans-{uuid.uuid4()}.ttf'
shutil.copy(f'{import_dir}/DejaVuSans.ttf', font_file)
self.pdf = FPDF()
self.pdf.add_page()
self.pdf.add_font('DejaVu', '', font_file, uni=True)
self.pdf.set_font('DejaVu', '', 14)
self.pdf.write(8, data)
self.pdf.ln(8)
def end_partition(self):
f = SnowflakeFile.open_new_result("wb")
f.write(self.pdf.output(dest='S').encode('latin-1'))
yield f,
$$;
Im folgenden SQL-Beispiel wird zunächst die Tabelle reportData
mit fiktiven Daten eingerichtet und dann der Stagingbereich output_stage
erstellt. Dann wird create_report_pdf
UDF aufgerufen, um eine PDF-Datei mit Daten zu erstellen, die aus der Tabelle reportData
abfragt werden. In der gleichen SQL-Anweisung kopiert der Befehl COPY FILES die Ergebnisdatei von UDF nach output_stage
.
Bemerkung
Wir verwenden einen serverseitig verschlüsselten (SSE) Stagingbereich, da die vorzeichenbehaftete URL zu einer Datei auf einem SSE-Stagingbereich eine unverschlüsselte Datei herunterlädt. Im Allgemeinen empfehlen wir die Standardverschlüsselung im Stagingbereich, da die Datei clientseitig verschlüsselt wird und dies sicherer ist. Dateien im normalen Stagingbereich können nach wie vor über UDFs oder GET/PUT gelesen werden – nur nicht über vorsignierte URLs. Stellen Sie sicher, dass Sie die Auswirkungen auf die Sicherheit verstehen, bevor Sie einen SSE-Stagingbereich in einer Produktionsumgebung verwenden.
-- Fictitious data
CREATE OR REPLACE TABLE reportData(location string, item string);
INSERT INTO reportData VALUES ('SanMateo' ,'Item A');
INSERT INTO reportData VALUES ('SanMateo' ,'Item Z');
INSERT INTO reportData VALUES ('SanMateo' ,'Item X');
INSERT INTO reportData VALUES ('Bellevue' ,'Item B');
INSERT INTO reportData VALUES ('Bellevue' ,'Item Q');
-- Presigned URLs only work with SSE stages, see note above.
CREATE OR REPLACE STAGE output_stage ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
COPY FILES INTO @output_stage
FROM (SELECT reports.file, location || '.pdf'
FROM reportData, TABLE(create_report_pdf(item)
OVER (partition BY location)) AS reports);
-- Check the results
LIST @output_stage;
SELECT GET_PRESIGNED_URL(@output_stage, 'SanMateo.pdf');
Dateien aufteilen und in mehrere Tabellen entladen¶
Das folgende UDF-Handler-Beispiel teilt eine CSV-Datei zeilenweise anhand des ersten Zeichens jeder Zeile auf. Die UDF entlädt dann die aufgeteilten Dateien in mehrere Tabellen.
CREATE OR REPLACE FUNCTION split_file(path string)
RETURNS TABLE(file string, name string)
LANGUAGE PYTHON
VOLATILE
PACKAGES = ('snowflake-snowpark-python')
RUNTIME_VERSION = 3.9
HANDLER = 'SplitCsvFile'
AS $$
import csv
from snowflake.snowpark.files import SnowflakeFile
class SplitCsvFile:
def process(self, file):
toTable1 = SnowflakeFile.open_new_result("w")
toTable1Csv = csv.writer(toTable1)
toTable2 = SnowflakeFile.open_new_result("w")
toTable2Csv = csv.writer(toTable2)
toTable3 = SnowflakeFile.open_new_result("w")
toTable3Csv = csv.writer(toTable3)
with SnowflakeFile.open(file, 'r') as file:
# File is of the format 1:itemA \n 2:itemB \n [...]
for line in file.readlines():
forTable = line[0]
if (forTable == "1"):
toTable1Csv.writerow([line[2:-1]])
if (forTable == "2"):
toTable2Csv.writerow([line[2:-1]])
if (forTable == "3"):
toTable3Csv.writerow([line[2:-1]])
yield toTable1, 'table1.csv'
yield toTable2, 'table2.csv'
yield toTable3, 'table3.csv'
$$;
-- Create a stage with access to an import file.
CREATE OR REPLACE STAGE staged_files url="s3://sfquickstarts/misc/";
-- Add the files to be split into a table - we just add one.
CREATE OR REPLACE TABLE filesToSplit(path string);
INSERT INTO filesToSplit VALUES ( 'items.txt');
-- Create output tables
CREATE OR REPLACE TABLE table1(item string);
CREATE OR REPLACE TABLE table2(item string);
CREATE OR REPLACE TABLE table3(item string);
-- Create output stage
CREATE OR REPLACE stage output_stage;
-- Creates files named path-tableX.csv
COPY FILES INTO @output_stage FROM
(SELECT file, path || '-' || name FROM filesToSplit, TABLE(split_file(build_scoped_file_url(@staged_files, path))));
-- We use pattern and COPY INTO (not COPY FILES INTO) to upload to a final table.
COPY INTO table1 FROM @output_stage PATTERN = '.*.table1.csv';
COPY INTO table2 FROM @output_stage PATTERN = '.*.table2.csv';
COPY INTO table3 FROM @output_stage PATTERN = '.*.table3.csv';
-- See results
SELECT * from table1;
SELECT * from table2;
SELECT * from table3;
Verwenden vektorisierter UDFs¶
Mit vektorisierten Python-UDFs können Sie Python-Funktionen definieren, mit denen Batches von Eingabezeilen als Pandas DataFrames empfangen und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben werden. Die Spalte im Snowpark-dataframe
wird als Pandas Series innerhalb der UDF vektorisiert.
Im Folgenden finden Sie ein Beispiel für die Verwendung der Batchschnittstelle:
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
Vektorisierte Python-UDFs werden genauso aufgerufen wie andere Python-UDFs. Weitere Informationen dazu finden Sie in unter Vektorisierte Python-UDFs, wo erklärt wird, wie eine vektorisierte UDF mithilfe einer SQL-Anweisung erstellt wird. So können Sie zum Beispiel beim Angeben des Python-Codes in der SQL-Anweisung das Decorator-Element vectorized
verwenden. Bei Verwendung der in diesem Dokument beschriebenen Snowpark Python-API verwenden Sie zum Erstellen einer vektorisierten UDF keine SQL-Anweisung. Sie verwenden also nicht das Decorator-Element vectorized
.
Die Anzahl der Zeilen pro Batch kann begrenzt werden. Weitere Informationen dazu finden Sie unter Festlegen einer Zielbatchgröße.
Weitere Erläuterungen und Beispiele zur Verwendung der Snowpark Python-API bei der Erstellung von vektorisierten UDFs finden Sie im UDFs-Abschnitt der Snowpark-API-Referenz.