Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion in Java erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Sie können Snowpark-APIs aufrufen, um benutzerdefinierte Funktionen (UDFs) für Lambda-Ausdrücke in Java zu erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihren DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, serialisiert die Snowpark-Bibliothek den Code für Ihre UDF und lädt ihn in einen internen Stagingbereich hoch. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, auf dem sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem kundenspezifischen Code können Sie auch Code aufrufen, der in JAR-Dateien gepackt ist (z. B. Java-Klassen für die Bibliothek eines Drittanbieters).

Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

Im restlichen Teil dieses Themas wird erläutert, wie UDFs erstellt werden.

Bemerkung

Wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, können Sie diese UDF in Snowpark aufrufen.

Weitere Details dazu finden Sie unter Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs).

Unterstützte Datentypen für Argumente und Rückgabewerte

Um eine UDF für ein Java-Lambda zu erstellen, müssen Sie die unten aufgeführten unterstützten Datentypen für die Argumente und den Rückgabewert Ihrer Funktion oder Ihres Lambdas verwenden:

SQL-Datentyp

Java-Datentyp

Anmerkungen

NUMBER

Die folgenden Typen werden unterstützt:

  • Integer

  • Long

  • java.math.BigDecimal oder java.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] oder Variant[]

OBJECT

Map<String, String> oder Map<String, Variant>

GEOGRAPHY

com.snowflake.snowpark_java.types.Geography

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.addDependency() für alle Dateien aufrufen, die Klassen und Ressourcen enthalten, von denen Ihre UDF abhängt (z. B. JAR-Dateien, Ressourcendateien usw.). (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien aus einer UDF.)

Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und fügt die Dateien beim Ausführen Ihrer UDF zum Klassenpfad hinzu.

Tipp

Wenn Sie nicht möchten, dass die Bibliothek die Datei jedes Mal hochlädt, wenn Sie Ihre Anwendung ausführen, laden Sie die Datei in einen Stagingbereich hoch. Übergeben Sie beim Aufruf von addDependency den Pfad zu der Datei im Stagingbereich.

Das folgende Beispiel zeigt, wie Sie eine JAR-Datei als Abhängigkeit zu einem Stagingbereich hinzufügen:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

Die folgenden Beispiele zeigen, wie Sie Abhängigkeiten für JAR-Dateien und Ressourcendateien hinzufügen:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:

  • Ihre Java-Laufzeitbibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

  • Die Snowpark-JAR-Datei.

    Die Snowpark-Bibliothek versucht automatisch, die Snowpark-JAR-Datei zu erkennen und auf den Server hochzuladen.

    So verhindern Sie, dass die Bibliothek die Snowpark-JAR-Datei wiederholt auf den Server hochlädt:

    1. Laden Sie die Snowpark-JAR-Datei in einen Stagingbereich hoch.

      Der folgende Befehl lädt z. B. die Snowpark-JAR-Datei in den Stagingbereich @mystage hoch. Der PUT-Befehl komprimiert die JAR-Datei und gibt der resultierenden Datei die Bezeichnung „snowpark-1.9.0.jar.gz“.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Rufen Sie addDependency auf, um die Snowpark-JAR-Datei als Abhängigkeit zum Stagingbereich hinzuzufügen.

      Um beispielsweise die mit dem vorherigen Befehl hochgeladene Snowpark-JAR-Datei hinzuzufügen:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz");

      Beachten Sie, dass der angegebene Pfad zur JAR-Datei die Dateinamenerweiterung .gz enthält, die durch den PUT-Befehl hinzugefügt wurde.

  • Die JAR-Datei oder das Verzeichnis mit der aktuell ausgeführten Anwendung.

    Die Snowpark-Bibliothek versucht automatisch, diese Abhängigkeiten zu erkennen und hochzuladen.

    Wenn die Snowpark-Bibliothek diese Abhängigkeiten nicht automatisch erkennen kann, meldet die Bibliothek einen Fehler, und Sie müssen addDependency aufrufen, um die Abhängigkeiten manuell hinzuzufügen.

Wenn das Hochladen der Abhängigkeiten in den Stagingbereich zu lange dauert, meldet die Snowpark-Bibliothek eine Timeout-Ausnahme. Um die maximale Zeitspanne zu konfigurieren, die die Snowpark-Bibliothek warten soll, legen Sie Erstellen der Sitzung den gewünschten Wert für die Eigenschaft snowpark_request_timeout_in_seconds fest.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die statische Methode Functions.udf auf, und übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen für Eingabe und Ausgabe repräsentieren.

  • Rufen Sie die Methode registerTemporary der Klasse UDFRegistration auf, und übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen für Eingabe und Ausgabe repräsentieren.

    Sie können auf eine Instanz der Klasse UDFRegistration zugreifen, indem Sie die Methode udf des Objekts Session aufrufen.

    Beim Aufruf von registerTemporary ist eine Methodensignatur zu verwenden, die keinen name-Parameter hat. (Da Sie eine anonyme UDF erstellen, geben Sie keinen Namen für die UDF an.)

Bemerkung

Wenn Sie Code mit mehreren Threads schreiben (z. B. bei Verwendung paralleler Collections), können Sie UDFs mit der Methode registerTemporary registrieren, anstatt die Methode udf zu verwenden. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Diese Methoden geben ein UserDefinedFunction-Objekt zurück, das Sie zum Aufrufen der UDF verwenden können (siehe Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)).

Im folgenden Beispiel wird eine anonyme UDF erstellt:

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Im folgenden Beispiel wird eine anonyme UDF erstellt, die eine kundenspezifische Klasse verwendet (LanguageDetector zum Erkennen der in Text verwendeten Sprache). Im Beispiel wird die anonyme UDF aufgerufen, um die Sprache in der Spalte text_data eines DataFrame zu erkennen und ein neues DataFrame zu erstellen, das eine zusätzliche Spalte lang mit der verwendeten Sprache enthält.

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF mit ihrem Namen aufrufen möchten (z. B. durch Verwendung der statischen Methode Functions.callUDF), oder wenn Sie einen UDF in nachfolgenden Sitzungen verwenden müssen, können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie dazu eine der folgenden Methoden in der UDFRegistration-Klasse:

  • registerTemporary, wenn Sie die UDF nur in der aktuellen Sitzung verwenden möchten

  • registerPermanent, wenn Sie planen, die UDF in nachfolgenden Sitzungen zu verwenden

Um auf ein Objekt der Klasse UDFRegistration zuzugreifen, rufen Sie die Methode udf des Session-Objekts auf.

Beim Aufruf der Methode registerTemporary oder registerPermanent übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen der Eingänge und Ausgänge darstellen.

Beispiel:

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent erstellt eine UDF, die Sie in der aktuellen und den folgenden Sitzungen verwenden können. Wenn Sie registerPermanent aufrufen, müssen Sie auch einen Speicherort in einem internen Stagingbereich angeben, in den die JAR-Dateien für die UDF und seine Abhängigkeiten hochgeladen werden sollen.

Bemerkung

registerPermanent unterstützt keine externen Stagingbereiche.

Beispiel:

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Verwenden von Objekten, die nicht serialisierbar sind

Wenn Sie eine UDF für einen Lambda-Ausdruck ausführen, serialisiert die Snowpark-Bibliothek die Lambda-Closure und sendet sie zur Ausführung an den Server.

Wenn ein von der Lambda-Closure erfasstes Objekt nicht serialisierbar ist, löst die Snowpark-Bibliothek eine java.io.NotSerializableException-Ausnahme aus.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Wenn dies der Fall ist, müssen Sie das Objekt serialisierbar machen.

Schreiben von Initialisierungscode für eine UDF

Wenn Ihre UDF Initialisierungscode oder Kontext benötigt, können Sie diesen durch Werte bereitstellen, die als Teil der UDF-Closure erfasst werden.

Im folgenden Beispiel wird eine separate Klasse verwendet, um den von zwei UDFs benötigten Kontext zu initialisieren.

  • Die erste UDF erstellt eine neue Instanz der Klasse innerhalb der Lambda-Funktion, sodass die Initialisierung bei jedem Aufruf der UDF durchgeführt wird.

  • Die zweite UDF erfasst eine Instanz der Klasse, die in Ihrem Clientprogramm generiert wurde. Der auf dem Client generierte Kontext wird serialisiert und von der UDF verwendet. Beachten Sie, dass die Kontextklasse serialisierbar sein muss, damit dieser Ansatz funktioniert.

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

Lesen von Dateien aus einer UDF

Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF-Daten aus einer Datei gelesen werden, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

Wenn außerdem der Inhalt der Datei zwischen den Aufrufen der UDF gleich bleibt, können Sie Ihren Code so schreiben, dass die Datei einmal beim ersten Aufruf geladen wird und bei den folgenden Aufrufen nicht mehr. Dies kann die Leistung Ihrer UDF-Aufrufe verbessern.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Fügen Sie die Datei zu einer JAR-Datei hinzu.

    Wenn Ihre UDF z. B. eine Datei in einem data/-Unterverzeichnis (data/hello.txt) verwenden muss, führen Sie den Befehl jar aus, um diese Datei zu einer JAR-Datei hinzuzufügen:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Geben Sie an, dass die JAR-Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen und dem Klassenpfad hinzugefügt wird. Siehe Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. Rufen Sie in der UDF Class.forName().getResourceAsStream() auf, um die Datei im Klassenpfad zu finden und die Datei zu lesen.

    Um das Hinzufügen einer Abhängigkeit von this zu vermeiden, können Sie Class.forName("com.snowflake.snowpark_java.DataFrame") (anstelle von getClass()) verwenden, um das Objekt Class zu erhalten.

    Beispiel für das Lesen der Datei data/hello.txt:

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    In diesem Beispiel beginnt der Ressourcenname mit einem /, was anzeigt, dass dies der vollständige Pfad der Datei in der JAR-Datei ist. (In diesem Fall ist der Speicherort der Datei nicht relativ zum Paket der Klasse.)

Bemerkung

Wenn Sie nicht erwarten, dass sich der Inhalt der Datei zwischen den UDF-Aufrufen ändert, lesen Sie die Datei in ein statisches Feld Ihrer Klasse, und lesen Sie die Datei nur, wenn das Feld nicht gesetzt ist.

Im folgenden Beispiel wird ein Objekt (UDFCode) mit einer Funktion definiert, die als UDF (readFileFunc) verwendet wird. Die Funktion liest die Datei data/hello.txt, die die Zeichenfolge hello, enthalten soll. Die Funktion stellt diese Zeichenfolge der als Argument übergebenen Zeichenfolge voran.

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

Im nächsten Teil des Beispiels wird die Funktion als anonyme UDF registriert. In dem Beispiel wird die UDF auf der Spalte NAME in einem DataFrame aufgerufen. Im Beispiel wird davon ausgegangen, dass die Datei data/hello.txt in der JAR-Datei myJar.jar gepackt ist.

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs)

So erstellen und registrieren Sie eine UDTF in Snowpark:

In den nächsten Abschnitten werden diese Schritte näher beschrieben.

Weitere Informationen zum Aufrufen von UDTFs finden Sie unter Aufrufen einer UDTF.

Definieren der UDTF-Klasse

Definieren Sie eine Klasse, die eine der JavaUDTFn-Schnittstellen (z. B. JavaUDTF0 oder JavaUDTF1) aus dem com.snowflake.snowpark_java.udtf package implementiert, wobei n die Anzahl der Eingabeargumente der UDTF angibt. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt, implementieren Sie die Klasse JavaUDTF2.

Implementieren Sie in Ihrer Klasse die folgenden Methoden:

  • outputSchema() – Gibt ein types.StructType-Objekt zurück, das die Namen und Typen der Felder in den zurückgegebenen Zeilen beschreibt (das „Schema“ der Ausgabe).

  • process() – Wird für jede Zeile in der Eingabepartition einmal aufgerufen (siehe Anmerkung unten).

  • inputSchema(), das ein types.StructType-Objekt zurückgibt, das die Typen der Eingabeparameter beschreibt.

    Wenn Ihre process()-Methode Map-Argumente übergibt, müssen Sie die inputSchema()-Methode implementieren. Ansonsten ist das Implementieren diese Methode optional.

  • endPartition() – Wird einmal für jede Partition aufgerufen wird, nachdem alle Zeilen an process() übergeben wurden.

Wenn eine UDTF aufgerufen wird, werden die Zeilen vor Übergabe an die UDTF in Partitionen gruppiert:

  • Wenn die Anweisung, die die UDTF aufruft, die PARTITION-Klausel (explizite Partitionen) enthält, bestimmt diese Klausel, wie die Zeilen partitioniert werden.

  • Wenn die Anweisung die PARTITION-Klausel nicht enthält (implizite Partitionen), bestimmt Snowflake, wie die Zeilen am besten zu partitionieren sind.

Weitere Informationen zu Partitionen finden Sie unter Tabellenfunktionen und Partitionen.

Ein Beispiel für eine UDTF-Klasse finden Sie unter Beispiel für eine UDTF-Klasse.

Implementieren der outputSchema()-Methode

Implementieren Sie die Methode outputSchema(), um die Namen und Datentypen der Felder (das „Ausgabeschema“) der Zeilen zu definieren, die von den Methoden process() und endPartition() zurückgegeben werden.

public StructType outputSchema()
Copy

Bei dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das StructField-Objekte enthält, die den Snowflake-Datentyp jedes Feldes in einer zurückgegebenen Zeile repräsentieren. Snowflake unterstützt die folgenden Typobjekte für das Ausgabeschema einer UDTF:

SQL-Datentyp

Java-Typ

com.snowflake.snowpark_java.types-Typ

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<Zeichenfolge, Zeichenfolge>

MapType(StringType, StringType)

OBJECT

java.util.Map<Zeichenfolge, Variant-Wert>

MapType(StringType, VariantType)

Wenn Ihre UDTF zum Beispiel eine Zeile mit einem einzigen Integer-Feld zurückgibt:

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

Implementieren der process()-Methode

Implementieren Sie in Ihrer UDTF-Klasse die Methode process():

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

wobei n die Anzahl der Argumente ist, die an Ihre UDTF übergeben werden.

Die Anzahl der Argumente in der Signatur entspricht der Schnittstelle, die Sie implementiert haben. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt und Sie die Klasse JavaUDTF2 implementieren, hat die process()-Methode folgende Signatur:

Stream<Row> process(A0 arg0, A1 arg1)
Copy

Diese Methode wird einmal für jede Zeile der Eingabepartition aufgerufen.

Auswählen der Argumenttypen

Verwenden Sie für den Typ jedes Arguments der process()-Methode den Java-Typ, der dem Snowflake-Datentyp des an die UDTF übergebenen Arguments entspricht.

Snowflake unterstützt die folgenden Datentypen für die Argumente einer UDTF:

SQL-Datentyp

Java-Datentyp

Anmerkungen

NUMBER

Die folgenden Typen werden unterstützt:

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] oder Variant[]

OBJECT

Map<String, String> oder Map<String, Variant>

Bemerkung

Wenn Sie java.util.Map-Argumente übergeben, müssen Sie die inputSchema-Methode implementieren, um die Typen dieser Argumente zu beschreiben. Siehe Implementieren der inputSchema()-Methode.

Zurückgeben von Zeilen

Erstellen Sie in der process()-Methode einen java.util.stream.Stream von Row-Objekten, die die Daten enthalten, die von der UDTF für die gegebenen Eingabewerte zurückgegeben werden sollen, und geben Sie diesen zurück. Die Felder in der Zeile müssen die Typen verwenden, die Sie in der Methode outputSchema angegeben haben. (siehe Implementieren der outputSchema()-Methode).

Wenn Ihre UDTF beispielsweise Zeilen generiert, konstruieren Sie ein Iterable von Row-Objekten für die generierten Zeilen, und geben dieses zurück:

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

Implementieren der inputSchema()-Methode

Wenn die process()-Methode ein java.util.Map-Argument übergibt, müssen Sie die inputSchema()-Methode implementieren, um die Typen der Eingabeargumente zu beschreiben.

Bemerkung

Wenn die process()-Methode keine Map-Argumente übergibt, muss die Methode inputSchema() nicht implementiert werden.

Bei dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das StructField-Objekte enthält, die Snowflake-Datentyp jedes Arguments repräsentieren, das an die process()-Methode übergeben wird. Snowflake unterstützt die folgenden Typobjekte für das Eingabeschema einer UDTF:

SQL-Datentyp

Java-Typ

com.snowflake.snowpark_java.types-Typ

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<Zeichenfolge, Zeichenfolge>

MapType(StringType, StringType)

OBJECT

java.util.Map<Zeichenfolge, Variant-Wert>

MapType(StringType, VariantType)

Angenommen, dass Ihre process()-Methode ein Map<Zeichenfolge, Zeichenfolge>-Argument und ein Map<Zeichenfolge, Variant-Wert>-Argument übergibt:

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

Sie müssen die inputSchema()-Methode implementieren, um ein StructType-Objekt zurückzugeben, das die Typen dieser Eingabeargumente beschreibt:

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

Implementieren der endPartition()-Methode

Implementieren Sie die endPartition-Methode, und fügen Sie Code hinzu, der ausgeführt werden soll, nachdem alle Zeilen in der Eingabepartition an die process-Methode übergeben wurden. Die Funktion endPartition wird einmal für jede Eingabepartition aufgerufen.

public Stream<Row> endPartition()
Copy

Sie können diese Methode verwenden, wenn Sie Workload ausführen müssen, nachdem alle Zeilen in der Partition verarbeitet worden sind. Beispiele:

  • Zurückgeben von Zeilen auf der Grundlage von Zustandsinformationen, die Sie in jedem process-Methodenaufruf erfassen.

  • Zurückgeben von Zeilen, die nicht an eine bestimmte Eingabezeile gebunden sind.

  • Zurückgeben von Zeilen, die die Ausgabezeilen zusammenfassen, die von der Methode process generiert wurden.

Die Felder in den zurückgegebenen Zeilen müssen den Typen entsprechen, die Sie in der Methode outputSchema angegebenen haben. (siehe Implementieren der outputSchema()-Methode).

Wenn Sie am Ende jeder Partition keine zusätzlichen Zeilen zurückgeben müssen, geben Sie z. B. einen leeren Stream zurück. Beispiel:

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

Bemerkung

Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn endPartition zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.

Beispiel für eine UDTF-Klasse

Es folgt ein Beispiel für eine UDTF-Klasse, die einen Bereich von Zeilen generiert.

  • Da die UDTF 2 Argumente übergibt, implementiert die Klasse JavaUDTF2.

  • Die Argumente start und count geben die Startnummer für die Zeile und die Anzahl der zu generierenden Zeilen an.

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

Registrieren der UDTF

Erstellen Sie dann eine Instanz der neuen Klasse, und registrieren Sie die Klasse durch Aufruf einer der UDTFRegistration-Methoden. Sie können die UDTF als temporär oder permanent registrieren.

Registrieren einer temporären UDTF

Um eine temporäre UDTF zu registrieren, rufen Sie UDTFRegistration.registerTemporary auf:

  • Wenn Sie die UDTF ohne Namen aufrufen müssen, können Sie eine anonyme UDTF registrieren, indem Sie eine Instanz der Klasse übergeben:

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • Wenn Sie die UDTF mit einem Namen aufrufen müssen, geben Sie auch einen Namen für die UDTF an:

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

Registrieren einer permanenten UDTF

Wenn Sie die UDTF in späteren Sitzungen verwenden müssen, rufen Sie UDTFRegistration.registerPermanent auf, um eine permanente UDTF zu registrieren.

Bei der Registrierung einer permanenten UDTF müssen Sie einen Stagingbereich angeben, in den die Registrierungsmethode die JAR-Dateien der UDTF und deren Abhängigkeiten hochlädt. Beispiel:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Aufrufen einer UDTF

Nach dem Registrieren der UDTF können Sie die UDTF aufrufen, indem Sie das zurückgegebene TableFunction-Objekt an die tableFunction-Methode des Session-Objekts übergeben:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

Um eine UDTF-Methode namentlich aufzurufen, erstellen Sie ein TableFunction-Objekt mit diesem Namen und übergeben es an die tableFunction-Methode:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Sie können eine UDTF-Anweisung auch direkt über eine SELECT-Anweisung aufrufen:

session.sql("select * from table(myUdtf(10, 5))");
Copy