Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames¶
Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion in Java erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.
Unter diesem Thema:
Einführung¶
Sie können Snowpark-APIs aufrufen, um benutzerdefinierte Funktionen (UDFs) für Lambda-Ausdrücke in Java zu erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihren DataFrame zu verarbeiten.
Wenn Sie die Snowpark-API verwenden, um eine UDF zu erstellen, serialisiert die Snowpark-Bibliothek den Code für Ihre UDF und lädt ihn in einen internen Stagingbereich hoch. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, auf dem sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.
In Ihrem kundenspezifischen Code können Sie auch Code aufrufen, der in JAR-Dateien gepackt ist (z. B. Java-Klassen für die Bibliothek eines Drittanbieters).
Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:
Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.
import com.snowflake.snowpark_java.types.*; ... // Create and register an anonymous UDF (doubleUdf) // that takes in an integer argument and returns an integer value. UserDefinedFunction doubleUdf = Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType); // Call the anonymous UDF. DataFrame df = session.table("sample_product_data"); DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity"))); dfWithDoubleQuantity.show();
Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.
import com.snowflake.snowpark_java.types.*; ... // Create and register a permanent named UDF ("doubleUdf") // that takes in an integer argument and returns an integer value. UserDefinedFunction doubleUdf = session .udf() .registerPermanent( "doubleUdf", (Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType, "mystage"); // Call the named UDF. DataFrame df = session.table("sample_product_data"); DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity"))); dfWithDoubleQuantity.show();
Im restlichen Teil dieses Themas wird erläutert, wie UDFs erstellt werden.
Bemerkung
Wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION
definiert haben, können Sie diese UDF in Snowpark aufrufen.
Weitere Details dazu finden Sie unter Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs).
Unterstützte Datentypen für Argumente und Rückgabewerte¶
Um eine UDF für ein Java-Lambda zu erstellen, müssen Sie die unten aufgeführten unterstützten Datentypen für die Argumente und den Rückgabewert Ihrer Funktion oder Ihres Lambdas verwenden:
SQL-Datentyp |
Java-Datentyp |
Anmerkungen |
---|---|---|
Die folgenden Typen werden unterstützt:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
Angeben von Abhängigkeiten für eine UDF¶
Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.addDependency()
für alle Dateien aufrufen, die Klassen und Ressourcen enthalten, von denen Ihre UDF abhängt (z. B. JAR-Dateien, Ressourcendateien usw.). (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien aus einer UDF.)
Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und fügt die Dateien beim Ausführen Ihrer UDF zum Klassenpfad hinzu.
Tipp
Wenn Sie nicht möchten, dass die Bibliothek die Datei jedes Mal hochlädt, wenn Sie Ihre Anwendung ausführen, laden Sie die Datei in einen Stagingbereich hoch. Übergeben Sie beim Aufruf von addDependency
den Pfad zu der Datei im Stagingbereich.
Das folgende Beispiel zeigt, wie Sie eine JAR-Datei als Abhängigkeit zu einem Stagingbereich hinzufügen:
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Die folgenden Beispiele zeigen, wie Sie Abhängigkeiten für JAR-Dateien und Ressourcendateien hinzufügen:
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");
// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:
Ihre Java-Laufzeitbibliotheken.
Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.
Die Snowpark-JAR-Datei.
Die Snowpark-Bibliothek versucht automatisch, die Snowpark-JAR-Datei zu erkennen und auf den Server hochzuladen.
So verhindern Sie, dass die Bibliothek die Snowpark-JAR-Datei wiederholt auf den Server hochlädt:
Laden Sie die Snowpark-JAR-Datei in einen Stagingbereich hoch.
Der folgende Befehl lädt z. B. die Snowpark-JAR-Datei in den Stagingbereich
@mystage
hoch. Der PUT-Befehl komprimiert die JAR-Datei und gibt der resultierenden Datei die Bezeichnung „snowpark-1.10.0.jar.gz“.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.10.0.jar @mystageRufen Sie
addDependency
auf, um die Snowpark-JAR-Datei als Abhängigkeit zum Stagingbereich hinzuzufügen.Um beispielsweise die mit dem vorherigen Befehl hochgeladene Snowpark-JAR-Datei hinzuzufügen:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.10.0.jar.gz");Beachten Sie, dass der angegebene Pfad zur JAR-Datei die Dateinamenerweiterung
.gz
enthält, die durch den PUT-Befehl hinzugefügt wurde.Die JAR-Datei oder das Verzeichnis mit der aktuell ausgeführten Anwendung.
Die Snowpark-Bibliothek versucht automatisch, diese Abhängigkeiten zu erkennen und hochzuladen.
Wenn die Snowpark-Bibliothek diese Abhängigkeiten nicht automatisch erkennen kann, meldet die Bibliothek einen Fehler, und Sie müssen
addDependency
aufrufen, um die Abhängigkeiten manuell hinzuzufügen.
Wenn das Hochladen der Abhängigkeiten in den Stagingbereich zu lange dauert, meldet die Snowpark-Bibliothek eine Timeout-Ausnahme. Um die maximale Zeitspanne zu konfigurieren, die die Snowpark-Bibliothek warten soll, legen Sie Erstellen der Sitzung den gewünschten Wert für die Eigenschaft snowpark_request_timeout_in_seconds fest.
Erstellen einer anonymen UDF¶
Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:
Rufen Sie die statische Methode
Functions.udf
auf, und übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen für Eingabe und Ausgabe repräsentieren.Rufen Sie die Methode
registerTemporary
der KlasseUDFRegistration
auf, und übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen für Eingabe und Ausgabe repräsentieren.Sie können auf eine Instanz der Klasse
UDFRegistration
zugreifen, indem Sie die Methodeudf
des ObjektsSession
aufrufen.Beim Aufruf von
registerTemporary
ist eine Methodensignatur zu verwenden, die keinenname
-Parameter hat. (Da Sie eine anonyme UDF erstellen, geben Sie keinen Namen für die UDF an.)
Bemerkung
Wenn Sie Code mit mehreren Threads schreiben (z. B. bei Verwendung paralleler Collections), können Sie UDFs mit der Methode registerTemporary
registrieren, anstatt die Methode udf
zu verwenden. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session
nicht gefunden werden kann.
Diese Methoden geben ein UserDefinedFunction
-Objekt zurück, das Sie zum Aufrufen der UDF verwenden können (siehe Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)).
Im folgenden Beispiel wird eine anonyme UDF erstellt:
import com.snowflake.snowpark_java.types.*;
...
// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Im folgenden Beispiel wird eine anonyme UDF erstellt, die eine kundenspezifische Klasse verwendet (LanguageDetector
zum Erkennen der in Text verwendeten Sprache). Im Beispiel wird die anonyme UDF aufgerufen, um die Sprache in der Spalte text_data
eines DataFrame zu erkennen und ein neues DataFrame zu erstellen, das eine zusätzliche Spalte lang
mit der verwendeten Sprache enthält.
import com.snowflake.snowpark_java.types.*;
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");
// Create a detector
LanguageDetector detector = new LanguageDetector();
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
Functions.udf(
(String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
DataTypes.StringType,
DataTypes.StringType);
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Erstellen und Registrieren einer benannten UDF¶
Wenn Sie eine UDF mit ihrem Namen aufrufen möchten (z. B. durch Verwendung der statischen Methode Functions.callUDF
), oder wenn Sie einen UDF in nachfolgenden Sitzungen verwenden müssen, können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie dazu eine der folgenden Methoden in der UDFRegistration
-Klasse:
registerTemporary
, wenn Sie die UDF nur in der aktuellen Sitzung verwenden möchtenregisterPermanent
, wenn Sie planen, die UDF in nachfolgenden Sitzungen zu verwenden
Um auf ein Objekt der Klasse UDFRegistration
zuzugreifen, rufen Sie die Methode udf
des Session
-Objekts auf.
Beim Aufruf der Methode registerTemporary
oder registerPermanent
übergeben Sie den Lambda-Ausdruck und die DataTypes-Felder (oder die von den Methoden dieser Klasse konstruierten Objekte), die die Datentypen der Eingänge und Ausgänge darstellen.
Beispiel:
import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
session
.udf()
.registerTemporary(
"doubleUdf",
(Integer x) -> x + x,
DataTypes.IntegerType,
DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
registerPermanent
erstellt eine UDF, die Sie in der aktuellen und den folgenden Sitzungen verwenden können. Wenn Sie registerPermanent
aufrufen, müssen Sie auch einen Speicherort in einem internen Stagingbereich angeben, in den die JAR-Dateien für die UDF und seine Abhängigkeiten hochgeladen werden sollen.
Bemerkung
registerPermanent
unterstützt keine externen Stagingbereiche.
Beispiel:
import com.snowflake.snowpark_java.types.*;
...
// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
session
.udf()
.registerPermanent(
"doubleUdf",
(Integer x) -> x + x,
DataTypes.IntegerType,
DataTypes.IntegerType,
"mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Verwenden von Objekten, die nicht serialisierbar sind¶
Wenn Sie eine UDF für einen Lambda-Ausdruck ausführen, serialisiert die Snowpark-Bibliothek die Lambda-Closure und sendet sie zur Ausführung an den Server.
Wenn ein von der Lambda-Closure erfasstes Objekt nicht serialisierbar ist, löst die Snowpark-Bibliothek eine java.io.NotSerializableException
-Ausnahme aus.
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Wenn dies der Fall ist, müssen Sie das Objekt serialisierbar machen.
Schreiben von Initialisierungscode für eine UDF¶
Wenn Ihre UDF Initialisierungscode oder Kontext benötigt, können Sie diesen durch Werte bereitstellen, die als Teil der UDF-Closure erfasst werden.
Im folgenden Beispiel wird eine separate Klasse verwendet, um den von zwei UDFs benötigten Kontext zu initialisieren.
Die erste UDF erstellt eine neue Instanz der Klasse innerhalb der Lambda-Funktion, sodass die Initialisierung bei jedem Aufruf der UDF durchgeführt wird.
Die zweite UDF erfasst eine Instanz der Klasse, die in Ihrem Clientprogramm generiert wurde. Der auf dem Client generierte Kontext wird serialisiert und von der UDF verwendet. Beachten Sie, dass die Kontextklasse serialisierbar sein muss, damit dieser Ansatz funktioniert.
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;
// Context needed for a UDF.
class Context {
double randomInt = Math.random();
}
// Serializable context needed for the UDF.
class SerContext implements Serializable {
double randomInt = Math.random();
}
class TestUdf {
public static void main(String[] args) {
// Create the session.
Session session = Session.builder().configFile("/<path>/profile.properties").create();
session.range(1, 10, 2).show();
// Create a DataFrame with two columns ("c" and "d").
DataFrame dummy =
session.createDataFrame(
new Row[]{
Row.create(1, 1),
Row.create(2, 2),
Row.create(3, 3)
},
StructType.create(
new StructField("c", DataTypes.IntegerType),
new StructField("d", DataTypes.IntegerType))
);
dummy.show();
// Initialize the context once per invocation.
UserDefinedFunction udfRepeatedInit =
Functions.udf(
(Integer i) -> new Context().randomInt,
DataTypes.IntegerType,
DataTypes.DoubleType
);
dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
SerContext sC = new SerContext();
UserDefinedFunction udfOnceInit =
Functions.udf(
(Integer i) -> sC.randomInt,
DataTypes.IntegerType,
DataTypes.DoubleType
);
dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
}
}
Lesen von Dateien aus einer UDF¶
Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF-Daten aus einer Datei gelesen werden, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.
Wenn außerdem der Inhalt der Datei zwischen den Aufrufen der UDF gleich bleibt, können Sie Ihren Code so schreiben, dass die Datei einmal beim ersten Aufruf geladen wird und bei den folgenden Aufrufen nicht mehr. Dies kann die Leistung Ihrer UDF-Aufrufe verbessern.
So richten Sie eine UDF zum Lesen einer Datei ein:
Fügen Sie die Datei zu einer JAR-Datei hinzu.
Wenn Ihre UDF z. B. eine Datei in einem
data/
-Unterverzeichnis (data/hello.txt
) verwenden muss, führen Sie den Befehljar
aus, um diese Datei zu einer JAR-Datei hinzuzufügen:# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
Geben Sie an, dass die JAR-Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen und dem Klassenpfad hinzugefügt wird. Siehe Angeben von Abhängigkeiten für eine UDF.
Beispiel:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar");
Rufen Sie in der UDF
Class.forName().getResourceAsStream()
auf, um die Datei im Klassenpfad zu finden und die Datei zu lesen.Um das Hinzufügen einer Abhängigkeit von
this
zu vermeiden, können SieClass.forName("com.snowflake.snowpark_java.DataFrame")
(anstelle vongetClass()
) verwenden, um das ObjektClass
zu erhalten.Beispiel für das Lesen der Datei
data/hello.txt
:// Read data/hello.txt from myJar.jar. String resourceName = "/data/hello.txt"; InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
In diesem Beispiel beginnt der Ressourcenname mit einem
/
, was anzeigt, dass dies der vollständige Pfad der Datei in der JAR-Datei ist. (In diesem Fall ist der Speicherort der Datei nicht relativ zum Paket der Klasse.)
Bemerkung
Wenn Sie nicht erwarten, dass sich der Inhalt der Datei zwischen den UDF-Aufrufen ändert, lesen Sie die Datei in ein statisches Feld Ihrer Klasse, und lesen Sie die Datei nur, wenn das Feld nicht gesetzt ist.
Im folgenden Beispiel wird ein Objekt (UDFCode
) mit einer Funktion definiert, die als UDF (readFileFunc
) verwendet wird. Die Funktion liest die Datei data/hello.txt
, die die Zeichenfolge hello,
enthalten soll. Die Funktion stellt diese Zeichenfolge der als Argument übergebenen Zeichenfolge voran.
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
// Create a function class that reads a file.
class UDFCode {
private static String fileContent = null;
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// The file content is cached in 'fileContent'.
public static String readFile() {
if (fileContent == null) {
try {
String resourceName = "/data/hello.txt";
InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
.getResourceAsStream(resourceName);
fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception e) {
fileContent = "Error while reading file";
}
}
return fileContent;
}
}
Im nächsten Teil des Beispiels wird die Funktion als anonyme UDF registriert. In dem Beispiel wird die UDF auf der Spalte NAME
in einem DataFrame aufgerufen. Im Beispiel wird davon ausgegangen, dass die Datei data/hello.txt
in der JAR-Datei myJar.jar
gepackt ist.
import com.snowflake.snowpark_java.types.*;
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");
// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
(String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs)¶
So erstellen und registrieren Sie eine UDTF in Snowpark:
In den nächsten Abschnitten werden diese Schritte näher beschrieben.
Weitere Informationen zum Aufrufen von UDTFs finden Sie unter Aufrufen einer UDTF.
Definieren der UDTF-Klasse¶
Definieren Sie eine Klasse, die eine der JavaUDTFn
-Schnittstellen (z. B. JavaUDTF0
oder JavaUDTF1
) aus dem com.snowflake.snowpark_java.udtf package implementiert, wobei n
die Anzahl der Eingabeargumente der UDTF angibt. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt, implementieren Sie die Klasse JavaUDTF2
.
Implementieren Sie in Ihrer Klasse die folgenden Methoden:
outputSchema() – Gibt ein
types.StructType
-Objekt zurück, das die Namen und Typen der Felder in den zurückgegebenen Zeilen beschreibt (das „Schema“ der Ausgabe).process() – Wird für jede Zeile in der Eingabepartition einmal aufgerufen (siehe Anmerkung unten).
inputSchema(), das ein
types.StructType
-Objekt zurückgibt, das die Typen der Eingabeparameter beschreibt.Wenn Ihre
process()
-MethodeMap
-Argumente übergibt, müssen Sie dieinputSchema()
-Methode implementieren. Ansonsten ist das Implementieren diese Methode optional.endPartition() – Wird einmal für jede Partition aufgerufen wird, nachdem alle Zeilen an
process()
übergeben wurden.
Wenn eine UDTF aufgerufen wird, werden die Zeilen vor Übergabe an die UDTF in Partitionen gruppiert:
Wenn die Anweisung, die die UDTF aufruft, die PARTITION-Klausel (explizite Partitionen) enthält, bestimmt diese Klausel, wie die Zeilen partitioniert werden.
Wenn die Anweisung die PARTITION-Klausel nicht enthält (implizite Partitionen), bestimmt Snowflake, wie die Zeilen am besten zu partitionieren sind.
Weitere Informationen zu Partitionen finden Sie unter Tabellenfunktionen und Partitionen.
Ein Beispiel für eine UDTF-Klasse finden Sie unter Beispiel für eine UDTF-Klasse.
Implementieren der outputSchema()-Methode¶
Implementieren Sie die Methode outputSchema()
, um die Namen und Datentypen der Felder (das „Ausgabeschema“) der Zeilen zu definieren, die von den Methoden process()
und endPartition()
zurückgegeben werden.
public StructType outputSchema()
Bei dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das StructField-Objekte enthält, die den Snowflake-Datentyp jedes Feldes in einer zurückgegebenen Zeile repräsentieren. Snowflake unterstützt die folgenden Typobjekte für das Ausgabeschema einer UDTF:
SQL-Datentyp |
Java-Typ |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Wenn Ihre UDTF zum Beispiel eine Zeile mit einem einzigen Integer-Feld zurückgibt:
public StructType outputSchema() { return StructType.create(new StructField("C1", DataTypes.IntegerType)); }
Implementieren der process()-Methode¶
Implementieren Sie in Ihrer UDTF-Klasse die Methode process()
:
Stream<Row> process(A0 arg0, ... A<n> arg<n>)
wobei n
die Anzahl der Argumente ist, die an Ihre UDTF übergeben werden.
Die Anzahl der Argumente in der Signatur entspricht der Schnittstelle, die Sie implementiert haben. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt und Sie die Klasse JavaUDTF2
implementieren, hat die process()
-Methode folgende Signatur:
Stream<Row> process(A0 arg0, A1 arg1)
Diese Methode wird einmal für jede Zeile der Eingabepartition aufgerufen.
Auswählen der Argumenttypen¶
Verwenden Sie für den Typ jedes Arguments der process()
-Methode den Java-Typ, der dem Snowflake-Datentyp des an die UDTF übergebenen Arguments entspricht.
Snowflake unterstützt die folgenden Datentypen für die Argumente einer UDTF:
SQL-Datentyp |
Java-Datentyp |
Anmerkungen |
---|---|---|
Die folgenden Typen werden unterstützt:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Bemerkung
Wenn Sie java.util.Map
-Argumente übergeben, müssen Sie die inputSchema
-Methode implementieren, um die Typen dieser Argumente zu beschreiben. Siehe Implementieren der inputSchema()-Methode.
Zurückgeben von Zeilen¶
Erstellen Sie in der process()
-Methode einen java.util.stream.Stream von Row
-Objekten, die die Daten enthalten, die von der UDTF für die gegebenen Eingabewerte zurückgegeben werden sollen, und geben Sie diesen zurück. Die Felder in der Zeile müssen die Typen verwenden, die Sie in der Methode outputSchema
angegeben haben. (siehe Implementieren der outputSchema()-Methode).
Wenn Ihre UDTF beispielsweise Zeilen generiert, konstruieren Sie ein Iterable
von Row
-Objekten für die generierten Zeilen, und geben dieses zurück:
import java.util.stream.Stream; ... public Stream<Row> process(Integer start, Integer count) { Stream.Builder<Row> builder = Stream.builder(); for (int i = start; i < start + count ; i++) { builder.add(Row.create(i)); } return builder.build(); }
Implementieren der inputSchema()-Methode¶
Wenn die process()-Methode ein java.util.Map
-Argument übergibt, müssen Sie die inputSchema()
-Methode implementieren, um die Typen der Eingabeargumente zu beschreiben.
Bemerkung
Wenn die process()
-Methode keine Map
-Argumente übergibt, muss die Methode inputSchema()
nicht implementiert werden.
Bei dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das StructField-Objekte enthält, die Snowflake-Datentyp jedes Arguments repräsentieren, das an die process()
-Methode übergeben wird. Snowflake unterstützt die folgenden Typobjekte für das Eingabeschema einer UDTF:
SQL-Datentyp |
Java-Typ |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Angenommen, dass Ihre process()
-Methode ein Map<Zeichenfolge, Zeichenfolge>
-Argument und ein Map<Zeichenfolge, Variant-Wert>
-Argument übergibt:
import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...
public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
...
}
Sie müssen die inputSchema()
-Methode implementieren, um ein StructType
-Objekt zurückzugeben, das die Typen dieser Eingabeargumente beschreibt:
import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...
public StructType inputSchema() {
return StructType.create(
new StructField(
"string_map",
DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
new StructField(
"variant_map",
DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Implementieren der endPartition()-Methode¶
Implementieren Sie die endPartition
-Methode, und fügen Sie Code hinzu, der ausgeführt werden soll, nachdem alle Zeilen in der Eingabepartition an die process
-Methode übergeben wurden. Die Funktion endPartition
wird einmal für jede Eingabepartition aufgerufen.
public Stream<Row> endPartition()
Sie können diese Methode verwenden, wenn Sie Workload ausführen müssen, nachdem alle Zeilen in der Partition verarbeitet worden sind. Beispiele:
Zurückgeben von Zeilen auf der Grundlage von Zustandsinformationen, die Sie in jedem
process
-Methodenaufruf erfassen.Zurückgeben von Zeilen, die nicht an eine bestimmte Eingabezeile gebunden sind.
Zurückgeben von Zeilen, die die Ausgabezeilen zusammenfassen, die von der Methode
process
generiert wurden.
Die Felder in den zurückgegebenen Zeilen müssen den Typen entsprechen, die Sie in der Methode outputSchema
angegebenen haben. (siehe Implementieren der outputSchema()-Methode).
Wenn Sie am Ende jeder Partition keine zusätzlichen Zeilen zurückgeben müssen, geben Sie z. B. einen leeren Stream
zurück. Beispiel:
public Stream<Row> endPartition() { return Stream.empty(); }
Bemerkung
Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn endPartition
zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.
Beispiel für eine UDTF-Klasse¶
Es folgt ein Beispiel für eine UDTF-Klasse, die einen Bereich von Zeilen generiert.
Da die UDTF 2 Argumente übergibt, implementiert die Klasse
JavaUDTF2
.Die Argumente
start
undcount
geben die Startnummer für die Zeile und die Anzahl der zu generierenden Zeilen an.
import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;
class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
public StructType outputSchema() {
return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
// Because the process() method in this example does not pass in Map arguments,
// implementing the inputSchema() method is optional.
public StructType inputSchema() {
return StructType.create(
new StructField("start_value", DataTypes.IntegerType),
new StructField("value_count", DataTypes.IntegerType));
}
public Stream<Row> endPartition() {
return Stream.empty();
}
public Stream<Row> process(Integer start, Integer count) {
Stream.Builder<Row> builder = Stream.builder();
for (int i = start; i < start + count ; i++) {
builder.add(Row.create(i));
}
return builder.build();
}
}
Registrieren der UDTF¶
Erstellen Sie dann eine Instanz der neuen Klasse, und registrieren Sie die Klasse durch Aufruf einer der UDTFRegistration-Methoden. Sie können die UDTF als temporär oder permanent registrieren.
Registrieren einer temporären UDTF¶
Um eine temporäre UDTF zu registrieren, rufen Sie UDTFRegistration.registerTemporary
auf:
Wenn Sie die UDTF ohne Namen aufrufen müssen, können Sie eine anonyme UDTF registrieren, indem Sie eine Instanz der Klasse übergeben:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf()); // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Wenn Sie die UDTF mit einem Namen aufrufen müssen, geben Sie auch einen Namen für die UDTF an:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf()); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Registrieren einer permanenten UDTF¶
Wenn Sie die UDTF in späteren Sitzungen verwenden müssen, rufen Sie UDTFRegistration.registerPermanent
auf, um eine permanente UDTF zu registrieren.
Bei der Registrierung einer permanenten UDTF müssen Sie einen Stagingbereich angeben, in den die Registrierungsmethode die JAR-Dateien der UDTF und deren Abhängigkeiten hochlädt. Beispiel:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage"); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Aufrufen einer UDTF¶
Nach dem Registrieren der UDTF können Sie die UDTF aufrufen, indem Sie das zurückgegebene TableFunction
-Objekt an die tableFunction
-Methode des Session
-Objekts übergeben:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf()); // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Um eine UDTF-Methode namentlich aufzurufen, erstellen Sie ein TableFunction
-Objekt mit diesem Namen und übergeben es an die tableFunction
-Methode:
// Register the MyRangeUdtf class that was defined in the previous example. TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf()); // Call the UDTF by name. session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Sie können eine UDTF-Anweisung auch direkt über eine SELECT-Anweisung aufrufen:
session.sql("select * from table(myUdtf(10, 5))");