Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Python

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Python erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem kundenspezifischen Code können Sie auch Module aus Python-Dateien oder Pakete von Drittanbietern importieren.

Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

In den nächsten Abschnitten wird erläutert, wie Sie diese UDFs mithilfe einer lokalen Entwicklungsumgebung oder mithilfe eines Python-Arbeitsblatts erstellen.

Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).

Bemerkung

Vektorisierte Python-UDFs ermöglichen die Definition von Python-Funktionen, die Batches von Eingabezeilen als Pandas DataFrames empfangen. Dies führt zu einer wesentlichen Leistungssteigerung bei Szenarios mit Machine Learning-Inferenz. Weitere Informationen dazu finden Sie unter Verwenden vektorisierter UDFs.

Bemerkung

Wenn Sie ein Python-Arbeitsblatt nutzen, verwenden Sie diese Beispiele innerhalb der Handler-Funktion:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

def main(session: snowpark.Session):
   df_table = session.table("sample_product_data")
Copy

Wenn die Beispiele etwas anderes als einen DataFrame zurückgeben, z. B. eine Liste (list) von Row-Objekten, können Sie den Rückgabetyp ändern, sodass er dem Rückgabetyp des Beispiels entspricht.

Nachdem Sie ein Codebeispiel ausgeführt haben, verwenden Sie die Registerkarte Results, um alle zurückgegebenen Ausgaben anzuzeigen. Weitere Informationen dazu finden Sie unter Ausführen von Python-Arbeitsblättern.

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie alle Dateien importieren, die Module enthalten, von denen Ihre UDF abhängt, wie Python-Dateien, ZIP-Dateien, Ressourcendateien usw.

Sie können auch ein Verzeichnis angeben, das von der Snowpark-Bibliothek automatisch komprimiert und als ZIP-Datei hochgeladen wird. (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien mit einer UDF.)

Wenn Sie Session.add_import() aufrufen, lädt die Snowpark-Bibliothek diese Dateien in einen internen Stagingbereich hoch und importiert die Dateien beim Ausführen Ihrer UDF.

Das folgende Beispiel zeigt, wie Sie eine in einem Stagingbereich befindliche ZIP-Datei als Abhängigkeit zu Ihrem Code hinzufügen:

# Add a zip file that you uploaded to a stage.
session.add_import("@my_stage/<path>/my_library.zip")
Copy

Die folgenden Beispiele zeigen, wie Sie eine Python-Datei von Ihrem lokalen Rechner hinzufügen:

# Import a Python file from your local machine.
session.add_import("/<path>/my_module.py")

# Import a Python file from your local machine and specify a relative Python import path.
session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
Copy

Die folgenden Beispiele zeigen, wie Sie andere Typen von Abhängigkeiten hinzufügen:

# Add a directory of resource files.
session.add_import("/<path>/my-resource-dir/")

# Add a resource file.
session.add_import("/<path>/my-resource.xml")
Copy

Bemerkung

Die Python-Snowpark-Bibliothek wird nicht automatisch hochgeladen.

Die folgenden Abhängigkeiten müssen nicht angegeben werden:

  • Ihre in Python integrierten Bibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

Verwenden von Paketen des Drittanbieters Anaconda in einer UDF

Sie können Pakete von Drittanbietern aus dem Snowflake-Anaconda-Kanal in einer UDF verwenden.

  • Wenn Sie eine Python-UDF in einem Python-Arbeitsblatt erstellen, sind die Anaconda-Pakete bereits für Ihr Arbeitsblatt verfügbar. Weitere Informationen dazu finden Sie unter Python-Datei aus einem Stagingbereich zu einem Arbeitsblatt hinzufügen.

  • Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF erstellen, können Sie angeben, welche Anaconda-Pakete installiert werden sollen.

Wenn Sie dann in einem Snowflake-Warehouse Abfragen ausführen, die gespeicherte Python-UDFs aufrufen, werden die Anaconda-Pakete nahtlos installiert und in dem virtuellen Warehouse für Sie zwischengespeichert.

Weitere Informationen zu Best Practices, zum Anzeigen der verfügbaren Pakete und zum Einrichten einer lokalen Entwicklungsumgebung finden Sie unter Verwenden von Drittanbieter-Paketen.

Wenn Sie in Ihrer lokalen Entwicklungsumgebung eine Python-UDF schreiben, verwenden Sie session.add_packages, um Pakete auf Sitzungsebene hinzuzufügen.

Das folgende Codebeispiel zeigt, wie Pakete importiert und deren Versionen zurückgegeben werden.

import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf

session.add_packages("numpy", "pandas", "xgboost==1.5.0")

@udf
def compute() -> list:
   return [np.__version__, pd.__version__, xgb.__version__]
Copy

Sie können auch session.add_requirements verwenden, um Pakete mit einer Anforderungsdatei anzugeben.

session.add_requirements("mydir/requirements.txt")
Copy

Sie können die Pakete auf UDF-Ebene hinzufügen, um Pakete auf Sitzungsebene zu überschreiben, die Sie zuvor hinzugefügt haben.

import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf

@udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
   def compute() -> list:
   return [np.__version__, pd.__version__, xgb.__version__]
Copy

Wichtig

Wenn Sie keine Paketversion angeben, verwendet Snowflake beim Auflösen der Abhängigkeiten die neueste Version. Wenn Sie die UDF dann in der Produktionsumgebung bereitstellen, müssen Sie sicherstellen, dass Ihr Code immer dieselben Abhängigkeitsversionen verwendet. Sie können dies sowohl für permanente als auch für temporäre UDFs tun.

  • Wenn Sie eine permanente UDF erstellen, wird die UDF nur einmal erstellt und registriert. Abhängigkeiten werden dabei einmalig aufgelöst, und die ausgewählte Version wird für Produktions-Workloads verwendet. Wenn die UDF ausgeführt wird, verwendet sie immer dieselben Abhängigkeitsversionen.

  • Wenn Sie eine temporäre UDF erstellen, geben Sie die Abhängigkeitsversionen als Teil der Versionsspezifikation an. Auf diese Weise verwendet die Paketauflösung bei der Registrierung der UDF die angegebene Version. Wenn Sie die Version nicht angeben und dann eine neue Version verfügbar ist, wird die Abhängigkeit möglicherweise aktualisiert.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion udf im Objekt snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode register in der Klasse UDFRegistration auf, und übergeben Sie die Definition der anonymen Funktion.

Das folgende Beispiel zeigt eine anonyme UDF:

from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Bemerkung

Wenn Sie Code schreiben, der in mehreren Sitzungen ausgeführt werden kann, verwenden Sie zum Registrieren der UDFs die Methode register und nicht die Funktion udf. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF über den Namen aufrufen möchten (z. B. durch Verwendung der Funktion call_udf im Objekt functions), können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie eine der folgenden Optionen:

  • Die Methode register der Klasse UDFRegistration mit dem Argument name.

  • Die Funktion udf im Modul snowflake.snowpark.functions mit dem Argument name.

Um auf ein Attribut oder eine Methode der Klasse UDFRegistration zuzugreifen, rufen Sie die Eigenschaft udf der Klasse Session auf.

Durch Aufrufen von register oder udf wird eine temporäre UDF erstellt, die Sie in der aktuellen Sitzung verwenden können.

Um eine permanente UDF zu erstellen, rufen Sie die Methode register oder die Funktion udf auf, und setzen Sie das Argument is_permanent auf True. Wenn Sie eine permanente UDF erstellen, müssen Sie auch das Argument stage_location auf den Speicherort des Stagingbereichs setzen, in den die Python-Datei für die UDF und deren Abhängigkeiten hochgeladen wird.

Das folgende Beispiel zeigt die Registrierung einer benannten temporären UDF:

from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
Copy

Das folgende Beispiel zeigt, wie eine benannte permanente UDF registriert wird, indem das Argument is_permanent auf True gesetzt wird:

@udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
def minus_one(x: int) -> int:
   return x-1
Copy

Das folgende Beispiel zeigt den Aufruf dieser UDFs:

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), minus_one("b")).collect()
Copy
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]

Sie können die UDF auch mit SQL aufrufen:

session.sql("select minus_one(1)").collect()
Copy
[Row(MINUS_ONE(1)=0)]

Erstellen einer UDF aus einer Python-Quelldatei

Wenn Sie die UDF in Ihrer lokalen Entwicklungsumgebung erstellen, können Sie Ihren UDF-Handler auch in einer Python-Datei definieren und dann die Methode register_from_file der Klasse UDFRegistration verwenden, um einen UDF zu erstellen.

Bemerkung

Sie können diese Methode nicht in einem Python-Arbeitsblatt verwenden.

Hier sind Beispiele für die Verwendung von register_from_file.

Angenommen, Sie haben eine Python-Datei test_udf_file.py, die Folgendes enthält:

def mod5(x: int) -> int:
    return x % 5
Copy

Dann können Sie eine UDF über diese Funktion der Datei test_udf_file.py erstellen.

# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
      file_path="tests/resources/test_udf_dir/test_udf_file.py",
      func_name="mod5",
   )
   session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
Copy
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Sie können die Datei auch an den Speicherort eines Stagingbereichs hochladen und dann zur Erstellung der UDF verwenden.

from snowflake.snowpark.types import IntegerType
# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
   file_path="@mystage/test_udf_file.py",
   func_name="mod5",
   return_type=IntegerType(),
   input_types=[IntegerType()],
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
Copy
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

Lesen von Dateien mit einer UDF

Um den Inhalt einer Datei zu lesen, kann Ihr Python-Code Folgendes tun:

Lesen von statisch spezifizierten Dateien

Die Snowpark-Bibliothek lädt die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF Daten aus einer Datei lesen soll, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

Bemerkung

Wenn Sie Ihre UDF in ein Python-Arbeitsblatt schreiben, kann die UDF nur Dateien aus einem Stagingbereich lesen.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Geben Sie an, dass die Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen wird. Weitere Informationen dazu finden Sie unter Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

     # Import a file from your local machine as a dependency.
     session.add_import("/<path>/my_file.txt")
    
    # Or import a file that you uploaded to a stage as a dependency.
     session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. Lesen Sie der Datei in der UDF. Im folgenden Beispiel wird die Datei nur einmal beim Erstellen der UDF gelesen und nicht noch einmal bei Ausführung der UDF. Dies wird mit der Drittanbieter-Bibliothek cachetools erreicht.

    import sys
    import os
    import cachetools
    from snowflake.snowpark.types import StringType
    @cachetools.cached(cache={})
    def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
          if import_dir:
             with open(os.path.join(import_dir, filename), "r") as f:
                return f.read()
    
       # create a temporary text file for test
    temp_file_name = "/tmp/temp.txt"
    with open(temp_file_name, "w") as t:
       _ = t.write("snowpark")
    session.add_import(temp_file_name)
    session.add_packages("cachetools")
    
    def add_suffix(s):
       return f"{read_file(os.path.basename(temp_file_name))}-{s}"
    
    concat_file_content_with_str_udf = session.udf.register(
          add_suffix,
          return_type=StringType(),
          input_types=[StringType()]
       )
    
    df = session.create_dataframe(["snowflake", "python"], schema=["a"])
    df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
    
    Copy
    [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
    
    os.remove(temp_file_name)
    session.clear_imports()
    
    Copy

Lesen von dynamisch spezifizierten Dateien mit SnowflakeFile

Mit der Klasse SnowflakeFile des Snowpark-Moduls snowflake.snowpark.files können Sie eine Datei aus einem Stagingbereich lesen. Die Klasse SnowflakeFile bietet einen dynamischen Dateizugriff, mit dem Sie Dateien beliebiger Größe streamen können. Der dynamische Dateizugriff ist auch nützlich, wenn Sie über mehrere Dateien iterieren möchten. Ein Beispiel dazu finden Sie unter Verarbeitung mehrerer Dateien.

Weitere Informationen und Beispiele zum Lesen von Dateien mit SnowflakeFile finden Sie unter Lesen einer Datei mit der Klasse SnowflakeFile eines Python-UDF-Handlers.

Im folgenden Beispiel wird eine temporäre UDF registriert, die mit SnowflakeFile eine Textdatei aus einem Stagingbereich liest und die Dateilänge zurückgibt.

Registrieren der UDF:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

Aufrufen der UDF:

session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
Copy

Schreiben von Dateien aus Snowpark Python UDFs und UDTFs

Mit Snowpark Python können Sie Dateien in Stagingbereiche mit benutzerdefinierten Funktionen (UDFs), vektorisierten UDFs, benutzerdefinierten Tabellenfunktionen (UDTFs) und vektorisierten UDTFs schreiben. Im Funktionshandler verwenden Sie die SnowflakeFile API, um Dateien zu öffnen und zu schreiben. Wenn Sie die Datei aus der Funktion zurückgeben, wird die Datei zusammen mit den Abfrageergebnissen geschrieben.

Eine einfache UDF zum Schreiben einer Datei könnte so aussehen:

CREATE OR REPLACE FUNCTION write_file()
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'write_file'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

def write_file():
  file = SnowflakeFile.open_new_result("w") # Open a new result file
  file.write("Hello world")                 # Write data
  return file               # File must be returned
$$;
Copy

Wenn Sie diese UDF ausführen, erhalten Sie eine Bereichs-URL, die auf die Ergebnisdatei verweist.

Zugriff auf die Ergebnisdateien

Ein Dateihandler wird als Bereichs-URL an die Abfrage zurückgegeben, die UDF aufruft. Sie können diese spezielle URL verwenden, um von Snowflake aus auf Dateien zuzugreifen (über eine andere UDF oder den Befehl COPY FILES), aber nicht von außerhalb von Snowflake als vorsignierte URL. Die Bereichs-URL ist 24 Stunden lang gültig.

Nachdem eine Datei von einem UDF zurückgegeben wurde, können Sie je nach Anwendungsfall mit einem der folgenden Speichertools darauf zugreifen:

  • COPY FILES: Kopieren Sie die Datei in einen anderen Stagingbereich. Nachdem die Datei kopiert wurde, können Sie sie wie eine typische Stagingdatei verwenden, z. B. mit den folgenden Tools:

  • UDF: Lesen Sie die Datei in einer anderen Abfrage.

Weitere Informationen dazu finden Sie unter COPY FILES.

Einschränkungen

  • Dieses Feature ist nicht für Java oder Scala verfügbar.

  • Gespeicherte Prozeduren unterstützen auch das Schreiben von Dateien, können aber nicht einfach mit einem COPY FILES-Befehl verkettet werden. Daher empfehlen wir für das Schreiben von Dateien mithilfe von gespeicherten Prozeduren den Datei-Staging-Befehl PUT zu verwenden.

Beispiele

Dieser Abschnitt enthält Codebeispiele, die zeigen, wie Sie Dateien für verschiedene Anwendungsfälle in den Stagingbereich schreiben.

Dateitransformation

Im Folgenden finden Sie ein Beispiel für einen UDF-Handler, der eine Datei umwandelt. Sie können dieses Beispiel abändern, um verschiedene Arten von Dateiumwandlungen vorzunehmen, wie z. B.:

  • Konvertieren von einem Dateiformat in ein anderes Format.

  • Ein Bild neu skalieren.

  • Transformieren von Dateien in einen „goldenen Zustand“ in einem Ordner mit Zeitstempel im selben oder einem anderen Bucket.

CREATE OR REPLACE FUNCTION convert_to_foo(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'convert_to_foo'
AS
$$
from snowflake.snowpark.files import SnowflakeFile

def convert_to_foo(filename):
  input_file = SnowflakeFile.open(filename, "r")
  converted_file = SnowflakeFile.open_new_result("w")

  # Foo-type is just adding foo at the end of every line
  for line in input_file.readlines():
    converted_file.write(line[:-1] + 'foo' + '\n')
  return converted_file
$$;
Copy

Sie können diese UDF in einer Abfrage aufrufen und dann auf die converted_file-Ergebnisdatei zugreifen, die von der UDF geschrieben wurde.

Die folgenden SQL-Beispiele zeigen, was Sie mit den von UDFs zurückgegebenen Ergebnisdateien tun können, z. B. sie in einen Stagingbereich kopieren oder sie in einer nachfolgenden Abfrage oder einer anderen UDF konsumieren. Diese grundlegenden SQL-Muster sind auf alle UDF Schreibbeispiele anwendbar, die in diesem Thema enthalten sind. Sie können zum Beispiel die vorzeichenbehaftete Abfrage-URL für jedes der folgenden UDF-Beispiele verwenden, indem Sie sie anstelle einer anderen SQL-Anweisung einsetzen.

Beispiel 1: Eine einzelne Datei konvertieren und in einen endgültigen Stagingbereich kopieren
COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, 'in.txt')), 'out.foo.txt');
Copy
Beispiel 2: Eine Tabelle mit Dateien konvertieren und in einen endgültigen Stagingbereich kopieren
CREATE TABLE files_to_convert(file string);
-- Populate files_to_convert with input files:
INSERT INTO files_to_convert VALUES ('file1.txt');
INSERT INTO files_to_convert VALUES ('file2.txt');

COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)),
      REPLACE(file, '.txt', '.foo.txt') FROM files_to_convert);
Copy
Beispiel 3: Alle Dateien konvertieren und in einen endgültigen Stagingbereich kopieren
COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, RELATIVE_PATH)),
      REPLACE(RELATIVE_PATH, 'format1', 'format2') FROM DIRECTORY(@input_stage));
Copy
Beispiel 4: Alle Dateien aus einer Tabelle konvertieren und ohne Kopieren lesen
-- A basic UDF to read a file:
CREATE OR REPLACE FUNCTION read_udf(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'read'
AS
'
from snowflake.snowpark.files import SnowflakeFile

def read(filename):
  return SnowflakeFile.open(filename, "r").read()
';
Copy
-- Create files_to_convert as in Example 2.

SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) as new_file
  FROM files_to_convert;
-- The following query must be run within 24 hours from the prior one
SELECT read_udf(new_file) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
Copy
Beispiel 5: Konvertieren Sie alle Dateien aus einer Tabelle und lesen Sie sie sofort über eine UDF
-- Set up files_to_convert as in Example 2.
-- Set up read_udf as in Example 4.

SELECT read_udf(
    convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file))) FROM files_to_convert;
Copy
Beispiel 6: Lesen mit vorzeichenbehafteten URLs

Dieses Beispiel gilt nur für Stagingbereiche mit serverseitiger Verschlüsselung. Interne Stagingbereiche verfügen standardmäßig über eine clientseitige Verschlüsselung.

COPY FILES INTO @output_stage FROM
  (SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) FROM files_to_convert);

-- Refresh the directory to get new files in output_stage.
ALTER STAGE output_stage REFRESH;
Copy

Eine PDF aus einer Partition von Tabellendaten erstellen und an einen endgültigen Speicherort kopieren

Das folgende UDF-Handler-Beispiel partitioniert die Eingabedaten und schreibt einen PDF-Bericht für jede Partition der Daten. In diesem Beispiel werden Berichte nach der Zeichenfolge location partitioniert.

Sie können dieses Beispiel auch ändern, um andere Dateitypen wie ML-Modelle und andere benutzerdefinierte Formate für jede Partition zu schreiben.

-- Create a stage that includes the font (for PDF creation)

CREATE OR REPLACE STAGE fonts
URL = 's3://sfquickstarts/misc/';

-- UDF to write the data
CREATE OR REPLACE FUNCTION create_report_pdf(data string)
RETURNS TABLE (file string)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER='CreateReport'
PACKAGES = ('snowflake-snowpark-python', 'fpdf')
IMPORTS  = ('@fonts/DejaVuSans.ttf')
AS $$
from snowflake.snowpark.files import SnowflakeFile
from fpdf import FPDF
import shutil
import sys
import uuid
import_dir = sys._xoptions["snowflake_import_directory"]

class CreateReport:
  def __init__(self):
      self.pdf = None

  def process(self, data):
      if self.pdf == None:
        # PDF library edits this file, make sure it's unique.
        font_file = f'/tmp/DejaVuSans-{uuid.uuid4()}.ttf'
        shutil.copy(f'{import_dir}/DejaVuSans.ttf', font_file)
        self.pdf = FPDF()
        self.pdf.add_page()
        self.pdf.add_font('DejaVu', '', font_file, uni=True)
        self.pdf.set_font('DejaVu', '', 14)
      self.pdf.write(8, data)
      self.pdf.ln(8)

  def end_partition(self):
      f = SnowflakeFile.open_new_result("wb")
      f.write(self.pdf.output(dest='S').encode('latin-1'))
      yield f,
$$;
Copy

Im folgenden SQL-Beispiel wird zunächst die Tabelle reportData mit fiktiven Daten eingerichtet und dann der Stagingbereich output_stage erstellt. Dann wird create_report_pdf UDF aufgerufen, um eine PDF-Datei mit Daten zu erstellen, die aus der Tabelle reportData abfragt werden. In der gleichen SQL-Anweisung kopiert der Befehl COPY FILES die Ergebnisdatei von UDF nach output_stage.

Bemerkung

Wir verwenden einen serverseitig verschlüsselten (SSE) Stagingbereich, da die vorzeichenbehaftete URL zu einer Datei auf einem SSE-Stagingbereich eine unverschlüsselte Datei herunterlädt. Im Allgemeinen empfehlen wir die Standardverschlüsselung im Stagingbereich, da die Datei clientseitig verschlüsselt wird und dies sicherer ist. Dateien im normalen Stagingbereich können nach wie vor über UDFs oder GET/PUT gelesen werden – nur nicht über vorsignierte URLs. Stellen Sie sicher, dass Sie die Auswirkungen auf die Sicherheit verstehen, bevor Sie einen SSE-Stagingbereich in einer Produktionsumgebung verwenden.

 -- Fictitious data
 CREATE OR REPLACE TABLE reportData(location string, item string);
 INSERT INTO reportData VALUES ('SanMateo' ,'Item A');
 INSERT INTO reportData VALUES ('SanMateo' ,'Item Z');
 INSERT INTO reportData VALUES ('SanMateo' ,'Item X');
 INSERT INTO reportData VALUES ('Bellevue' ,'Item B');
 INSERT INTO reportData VALUES ('Bellevue' ,'Item Q');

 -- Presigned URLs only work with SSE stages, see note above.
 CREATE OR REPLACE STAGE output_stage ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');

 COPY FILES INTO @output_stage
   FROM (SELECT reports.file, location || '.pdf'
           FROM reportData, TABLE(create_report_pdf(item)
           OVER (partition BY location)) AS reports);

 -- Check the results
LIST @output_stage;
SELECT GET_PRESIGNED_URL(@output_stage, 'SanMateo.pdf');
Copy

Dateien aufteilen und in mehrere Tabellen entladen

Das folgende UDF-Handler-Beispiel teilt eine CSV-Datei zeilenweise anhand des ersten Zeichens jeder Zeile auf. Die UDF entlädt dann die aufgeteilten Dateien in mehrere Tabellen.

CREATE OR REPLACE FUNCTION split_file(path string)
RETURNS TABLE(file string, name string)
LANGUAGE PYTHON
VOLATILE
PACKAGES = ('snowflake-snowpark-python')
RUNTIME_VERSION = 3.9
HANDLER = 'SplitCsvFile'
AS $$
import csv
from snowflake.snowpark.files import SnowflakeFile

class SplitCsvFile:
    def process(self, file):
      toTable1 = SnowflakeFile.open_new_result("w")
      toTable1Csv = csv.writer(toTable1)
      toTable2 = SnowflakeFile.open_new_result("w")
      toTable2Csv = csv.writer(toTable2)
      toTable3 = SnowflakeFile.open_new_result("w")
      toTable3Csv = csv.writer(toTable3)
      with SnowflakeFile.open(file, 'r') as file:
        # File is of the format 1:itemA \n 2:itemB \n [...]
        for line in file.readlines():
          forTable = line[0]
          if (forTable == "1"):
            toTable1Csv.writerow([line[2:-1]])
          if (forTable == "2"):
            toTable2Csv.writerow([line[2:-1]])
          if (forTable == "3"):
            toTable3Csv.writerow([line[2:-1]])
      yield toTable1, 'table1.csv'
      yield toTable2, 'table2.csv'
      yield toTable3, 'table3.csv'
$$;
-- Create a stage with access to an import file.
CREATE OR REPLACE STAGE staged_files url="s3://sfquickstarts/misc/";

-- Add the files to be split into a table - we just add one.
CREATE OR REPLACE TABLE filesToSplit(path string);
INSERT INTO filesToSplit VALUES ( 'items.txt');

-- Create output tables
CREATE OR REPLACE TABLE table1(item string);
CREATE OR REPLACE TABLE table2(item string);
CREATE OR REPLACE TABLE table3(item string);

-- Create output stage
CREATE OR REPLACE stage output_stage;

-- Creates files named path-tableX.csv
COPY FILES INTO @output_stage FROM
  (SELECT file, path || '-' || name FROM filesToSplit, TABLE(split_file(build_scoped_file_url(@staged_files, path))));

-- We use pattern and COPY INTO (not COPY FILES INTO) to upload to a final table.
COPY INTO table1 FROM @output_stage PATTERN = '.*.table1.csv';
COPY INTO table2 FROM @output_stage PATTERN = '.*.table2.csv';
COPY INTO table3 FROM @output_stage PATTERN = '.*.table3.csv';

-- See results
SELECT * from table1;
SELECT * from table2;
SELECT * from table3;
Copy

Verwenden vektorisierter UDFs

Mit vektorisierten Python-UDFs können Sie Python-Funktionen definieren, mit denen Batches von Eingabezeilen als Pandas DataFrames empfangen und Batches von Ergebnissen als Pandas-Arrays oder Pandas Series zurückgeben werden. Die Spalte im Snowpark-dataframe wird als Pandas Series innerhalb der UDF vektorisiert.

Im Folgenden finden Sie ein Beispiel für die Verwendung der Batchschnittstelle:

from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)

@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
    # The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
    df.columns = ["col1", "col2", "col3", "col4"]
    return model.predict(df)
Copy

Vektorisierte Python-UDFs werden genauso aufgerufen wie andere Python-UDFs. Weitere Informationen dazu finden Sie in unter Vektorisierte Python-UDFs, wo erklärt wird, wie eine vektorisierte UDF mithilfe einer SQL-Anweisung erstellt wird. So können Sie zum Beispiel beim Angeben des Python-Codes in der SQL-Anweisung das Decorator-Element vectorized verwenden. Bei Verwendung der in diesem Dokument beschriebenen Snowpark Python-API verwenden Sie zum Erstellen einer vektorisierten UDF keine SQL-Anweisung. Sie verwenden also nicht das Decorator-Element vectorized.

Die Anzahl der Zeilen pro Batch kann begrenzt werden. Weitere Informationen dazu finden Sie unter Festlegen einer Zielbatchgröße.

Weitere Erläuterungen und Beispiele zur Verwendung der Snowpark Python-API bei der Erstellung von vektorisierten UDFs finden Sie im UDFs-Abschnitt der Snowpark-API-Referenz.