Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Scala¶
Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Scala erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.
Unter diesem Thema:
Einführung¶
Sie können Snowpark-APIs aufrufen, um benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen in Scala zu erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.
Wenn Sie die Snowpark-API verwenden, um einen UDF zu erstellen, serialisiert die Snowpark-Bibliothek den Code für Ihre UDF und lädt ihn in einen internen Stagingbereich hoch. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, auf dem sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.
In Ihrem kundenspezifischen Code können Sie auch Code aufrufen, der in JAR-Dateien gepackt ist (z. B. Java-Klassen für die Bibliothek eines Drittanbieters).
Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:
Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.
// Create and register an anonymous UDF (doubleUdf). val doubleUdf = udf((x: Int) => x + x) // Call the anonymous UDF. val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.
// Create and register a permanent named UDF ("doubleUdf"). session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage") // Call the named UDF. val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Die folgenden Abschnitte enthalten wichtige Informationen zur Erstellung von UDFs in Snowpark:
Im restlichen Teil dieses Themas wird erläutert, wie UDFs erstellt werden.
Bemerkung
Wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION
definiert haben, können Sie diese UDF in Snowpark aufrufen.
Weitere Details dazu finden Sie unter Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs).
Unterstützte Datentypen für Argumente und Rückgabewerte¶
Um eine UDF für eine Scala-Funktion oder ein Lambda zu erstellen, müssen Sie die unten aufgeführten unterstützten Datentypen für die Argumente und den Rückgabewert Ihrer Funktion oder Ihres Lambdas verwenden:
SQL-Datentyp |
Scala-Datentyp |
Anmerkungen |
---|---|---|
Die folgenden Typen werden unterstützt:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Es werden veränderbare Maps der folgenden Typen unterstützt:
|
|
Einschränkungen beim Erstellen von UDFs in einem Objekt mit der Eigenschaft „App Trait“¶
Scala bietet eine App-Eigenschaft, die Sie erweitern können, um Ihr Scala-Objekt in ein ausführbares Programm umzuwandeln. Die App
-Eigenschaft bietet eine main
-Methode, die automatisch den gesamten Codetext Ihrer Objektdefinition ausführt. (Der Code in Ihrer Objektdefinition wird faktisch zur main
-Methode).
Eine mögliche Auswirkung der Erweiterung der App
-Eigenschaft besteht darin, dass die Felder in Ihrem Objekt erst bei Aufruf der main
-Methode initialisiert werden. Wenn Ihr Objekt App
erweitert und Sie eine UDF definieren, die ein Objektfeld verwendet, das Sie zuvor initialisiert haben, enthält die auf den Server hochgeladene UDF-Definition nicht den initialisierten Wert des Objektfelds.
Angenommen, Sie haben ein Feld mit dem Namen myConst
im Objekt definiert und initialisiert und verwenden dieses Feld in einer UDF:
object Main extends App {
...
// Initialize a field.
val myConst = "Prefix "
// Use the field in a UDF.
// Because the App trait delays the initialization of the object fields,
// myConst in the UDF definition resolves to null.
val myUdf = udf((s : String) => myConst + s )
...
}
Wenn Snowpark die UDF-Definition serialisiert und zu Snowflake hochlädt, wird myConst
nicht initialisiert und daher zu null
aufgelöst. Als Ergebnis gibt der UDF-Aufruf null
für myConst
zurück.
Um dies zu umgehen, müssen Sie Ihr Objekt so ändern, dass es die App
-Eigenschaft nicht erweitert. Implementieren Sie dazu eine separate main
-Methode für Ihren Code:
object Main {
...
def main(args: Array[String]): Unit = {
... // Your code ...
}
...
}
Angeben von Abhängigkeiten für eine UDF¶
Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.addDependency()
für alle Dateien aufrufen, die Klassen und Ressourcen enthalten, von denen Ihre UDF abhängt (z. B. JAR-Dateien, Ressourcendateien usw.). (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien aus einer UDF.)
Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und fügt die Dateien beim Ausführen Ihrer UDF zum Klassenpfad hinzu.
Tipp
Wenn Sie nicht möchten, dass die Bibliothek die Datei jedes Mal hochlädt, wenn Sie Ihre Anwendung ausführen, laden Sie die Datei in einen Stagingbereich hoch. Übergeben Sie beim Aufruf von addDependency
den Pfad zu der Datei im Stagingbereich.
Wenn Sie die Scala REPL verwenden, müssen Sie das Verzeichnis der von der REPL generierten Klassen als Abhängigkeit hinzufügen. Wenn Sie z. B. das Skript run.sh
zum Starten von REPL verwendet haben, rufen Sie die folgende Methode auf, die das vom Skript erstellte Verzeichnis repl_classes
hinzufügt:
// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Das folgende Beispiel zeigt, wie Sie eine JAR-Datei als Abhängigkeit zu einem Stagingbereich hinzufügen:
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Die folgenden Beispiele zeigen, wie Sie Abhängigkeiten für JAR-Dateien und Ressourcendateien hinzufügen:
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")
// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:
Ihre Scala-Laufzeitbibliotheken.
Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.
Die Snowpark-JAR-Datei.
Die Snowpark-Bibliothek versucht automatisch, die Snowpark-JAR-Datei zu erkennen und auf den Server hochzuladen.
So verhindern Sie, dass die Bibliothek die Snowpark-JAR-Datei wiederholt auf den Server hochlädt:
Laden Sie die Snowpark-JAR-Datei in einen Stagingbereich hoch.
Der folgende Befehl lädt z. B. die Snowpark-JAR-Datei in den Stagingbereich
@mystage
hoch. Der PUT-Befehl komprimiert die JAR-Datei und gibt der resultierenden Datei die Bezeichnung „snowpark-1.10.0.jar.gz“.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.10.0.jar @mystageRufen Sie
addDependency
auf, um die Snowpark-JAR-Datei als Abhängigkeit zum Stagingbereich hinzuzufügen.Um beispielsweise die mit dem vorherigen Befehl hochgeladene Snowpark-JAR-Datei hinzuzufügen:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.10.0.jar.gz")Beachten Sie, dass der angegebene Pfad zur JAR-Datei die Dateinamenerweiterung
.gz
enthält, die durch den PUT-Befehl hinzugefügt wurde.Die JAR-Datei oder das Verzeichnis mit der aktuell ausgeführten Anwendung.
Die Snowpark-Bibliothek versucht automatisch, diese Abhängigkeiten zu erkennen und hochzuladen.
Wenn die Snowpark-Bibliothek diese Abhängigkeiten nicht automatisch erkennen kann, meldet die Bibliothek einen Fehler, und Sie müssen
addDependency
aufrufen, um die Abhängigkeiten manuell hinzuzufügen.
Wenn das Hochladen der Abhängigkeiten in den Stagingbereich zu lange dauert, meldet die Snowpark-Bibliothek eine Timeout-Ausnahme. Um die maximale Zeitspanne zu konfigurieren, die die Snowpark-Bibliothek warten soll, legen Sie Erstellen der Sitzung den gewünschten Wert für die Eigenschaft snowpark_request_timeout_in_seconds fest.
Erstellen einer anonymen UDF¶
Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:
Rufen Sie die Funktion
udf
im Objektcom.snowflake.snowpark.functions
auf, und übergeben Sie die Definition der anonymen Funktion.Rufen Sie die Methode
registerTemporary
in der KlasseUDFRegistration
auf, und übergeben Sie die Definition der anonymen Funktion. Da Sie einen anonymen UDF registrieren, müssen Sie die Methodensignaturen verwenden, die keinenname
-Parameter haben.
Bemerkung
Wenn Sie Code mit mehreren Threads schreiben (z. B. bei Verwendung paralleler Collections), können Sie UDFs mit der Methode registerTemporary
registrieren, anstatt die Funktion udf
zu verwenden. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session
nicht gefunden werden kann.
Diese Methoden geben ein UserDefinedFunction
-Objekt zurück, das Sie zum Aufrufen der UDF verwenden können (siehe Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)).
Im folgenden Beispiel wird eine anonyme UDF erstellt:
// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Bemerkung
Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter Notebook für Snowpark Scala) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).
Im folgenden Beispiel wird eine anonyme UDF erstellt, der ein Array
von String
-Werten übergeben und die Zeichenfolge x
an jeden Wert anhängt wird:
// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Im folgenden Beispiel wird eine anonyme UDF erstellt, die eine kundenspezifische Klasse verwendet (LanguageDetector
zum Erkennen der in Text verwendeten Sprache). Im Beispiel wird die anonyme UDF aufgerufen, um die Sprache in der Spalte text_data
eines DataFrame zu erkennen und ein neues DataFrame zu erstellen, das eine zusätzliche Spalte lang
mit der verwendeten Sprache enthält.
// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")
// Create a detector
val detector = new LanguageDetector()
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
Option(detector.detect(s)).getOrElse("UNKNOWN"))
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(col("text_data")))
Erstellen und Registrieren einer benannten UDF¶
Wenn Sie eine UDF mit ihrem Namen aufrufen möchten (z. B. durch Verwendung der Funktion callUDF
im Objekt functions
) oder wenn Sie einen UDF in nachfolgenden Sitzungen verwenden müssen, können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie dazu eine der folgenden Methoden in der UDFRegistration
-Klasse:
registerTemporary
, wenn Sie die UDF nur in der aktuellen Sitzung verwenden möchtenregisterPermanent
, wenn Sie planen, die UDF in nachfolgenden Sitzungen zu verwenden
Um auf ein Objekt der Klasse UDFRegistration
zuzugreifen, rufen Sie die Methode udf
der Klasse Session
auf.
registerTemporary
erstellt eine temporäre UDF, die Sie in der aktuellen Sitzung verwenden können.
// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
registerPermanent
erstellt eine UDF, die Sie in der aktuellen und den folgenden Sitzungen verwenden können. Wenn Sie registerPermanent
aufrufen, müssen Sie auch einen Speicherort in einem internen Stagingbereich angeben, in den die JAR-Dateien für die UDF und seine Abhängigkeiten hochgeladen werden sollen.
Bemerkung
registerPermanent
unterstützt keine externen Stagingbereiche.
Beispiel:
// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Bemerkung
Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter Notebook für Snowpark Scala) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).
Erstellen von UDFs in Jupyter Notebooks¶
Wenn Sie UDFs in einem Jupyter-Notebook erstellen, müssen Sie die folgenden zusätzlichen Schritte ausführen:
Einrichten eines Jupyter Notebook für Snowpark Scala (wenn Sie das Notebook nicht bereits für die Arbeit mit Snowpark eingerichtet haben)
Zugriff auf eine Variable, die in einer anderen Zelle definiert ist
Schreiben der Implementierung einer UDF¶
Definieren Sie die Implementierung Ihrer Funktion in einer Klasse, die Serializable
erweitert. Beispiel:
// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
val myUserDefinedFunc = (s: String) => {
...
}
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Zugriff auf eine Variable, die in einer anderen Zelle definiert ist¶
Wenn Sie eine in einer anderen Zelle definierte Variable in Ihrer UDF verwenden müssen, müssen Sie die Variable als Argument an den Klassenkonstruktor übergeben. Nehmen Sie zum Beispiel an, dass Sie in Zelle 1 eine Variable definiert haben:
In [1]:
val prefix = "Hello"
und Sie möchten diese Variable in einer UDF verwenden, die Sie in Zelle 2 definiert haben. Fügen Sie im Klassenkonstruktor der UDF ein Argument für diese Variable hinzu. Wenn Sie dann den Klassenkonstruktor zum Erstellen von UDF aufrufen, übergeben Sie die in Zelle 1 definierte Variable:
In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
val prependPrefixFunc = (s: String) => {
s"$prefix $s"
}
}
// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Verwenden von Objekten, die nicht serialisierbar sind¶
Wenn Sie eine UDF für ein Lambda oder eine Funktion ausführen, serialisiert die Snowpark-Bibliothek die Lambda-Closure und sendet sie zur Ausführung an den Server.
Wenn ein von der Lambda-Closure erfasstes Objekt nicht serialisierbar ist, löst die Snowpark-Bibliothek eine java.io.NotSerializableException
-Ausnahme aus.
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Wenn dies der Fall ist, haben Sie folgende Optionen:
Sie können das Objekt serialisierbar machen oder
Sie können das Objekt als
lazy val
deklarieren, oder verwenden Sie die@transient
-Anmerkung, um die Serialisierung des Objekts zu vermeiden.Beispiel:
// Declare the detector object as lazy. lazy val detector = new LanguageDetector("en") // The detector object is not serialized but is instead reconstructed on the server. val langUdf = udf((s: String) => Option(detector.detect(s)).getOrElse("UNKNOWN"))
Schreiben von Initialisierungscode für eine UDF¶
Wenn Ihre UDF Initialisierungscode oder Kontext benötigt, können Sie diesen durch Werte bereitstellen, die als Teil der UDF-Closure erfasst werden.
Im folgenden Beispiel wird eine separate Klasse verwendet, um den von drei UDFs benötigten Kontext zu initialisieren.
Die erste UDF erstellt eine neue Instanz der Klasse innerhalb der Lambda-Funktion, sodass die Initialisierung bei jedem Aufruf der UDF durchgeführt wird.
Die zweite UDF erfasst eine Instanz der Klasse, die in Ihrem Clientprogramm generiert wurde. Der auf dem Client generierte Kontext wird serialisiert und von der UDF verwendet. Beachten Sie, dass die Kontextklasse serialisierbar sein muss, damit dieser Ansatz funktioniert.
Die dritte UDF erfasst einen
lazy val
, sodass der Kontext beim ersten UDF-Aufruf im Lazy-Modus instanziiert und bei nachfolgenden Aufrufen wiederverwendet wird. Dieser Ansatz funktioniert auch, wenn der Kontext nicht serialisierbar ist. Es gibt jedoch keine Garantie dafür, dass ALL ALLE UDF -Aufrufe innerhalb eines DataFrames denselben, im Lazy-Modus generierten Kontext verwenden.
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random
// Context needed for a UDF.
class Context {
val randomInt = Random.nextInt
}
// Serializable context needed for the UDF.
class SerContext extends Serializable {
val randomInt = Random.nextInt
}
object TestUdf {
def main(args: Array[String]): Unit = {
// Create the session.
val session = Session.builder.configFile("/<path>/profile.properties").create
import session.implicits._
session.range(1, 10, 2).show()
// Create a DataFrame with two columns ("c" and "d").
val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
dummy.show()
// Initialize the context once per invocation.
val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
dummy.select(udfRepeatedInit('c)).show()
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
val sC = new SerContext
val udfOnceInit = udf((i: Int) => sC.randomInt)
dummy.select(udfOnceInit('c)).show()
// Initialize the non-serializable context only once,
// regardless of the number of times that the UDF is invoked.
lazy val unserC = new Context
val udfOnceInitU = udf((i: Int) => unserC.randomInt)
dummy.select(udfOnceInitU('c)).show()
}
}
Lesen von Dateien aus einer UDF¶
Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF-Daten aus einer Datei gelesen werden, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.
Wenn außerdem der Inhalt der Datei zwischen den Aufrufen der UDF gleich bleibt, können Sie Ihren Code so schreiben, dass die Datei einmal beim ersten Aufruf geladen wird und bei den folgenden Aufrufen nicht mehr. Dies kann die Leistung Ihrer UDF-Aufrufe verbessern.
So richten Sie eine UDF zum Lesen einer Datei ein:
Fügen Sie die Datei zu einer JAR-Datei hinzu.
Wenn Ihre UDF z. B. eine Datei in einem
data/
-Unterverzeichnis (data/hello.txt
) verwenden muss, führen Sie den Befehljar
aus, um diese Datei zu einer JAR-Datei hinzuzufügen:# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
Geben Sie an, dass die JAR-Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen und dem Klassenpfad hinzugefügt wird. Siehe Angeben von Abhängigkeiten für eine UDF.
Beispiel:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar")
Rufen Sie in der UDF
Class.getResourceAsStream
auf, um die Datei im Klassenpfad zu finden und die Datei zu lesen.Um das Hinzufügen einer Abhängigkeit von
this
zu vermeiden, können SieclassOf[com.snowflake.snowpark.DataFrame]
(anstelle vongetClass
) verwenden, um das ObjektClass
zu erhalten.Beispiel für das Lesen der Datei
data/hello.txt
:// Read data/hello.txt from myJar.jar. val resourceName = "/data/hello.txt" val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
In diesem Beispiel beginnt der Ressourcenname mit einem
/
, was anzeigt, dass dies der vollständige Pfad der Datei in der JAR-Datei ist. (In diesem Fall ist der Speicherort der Datei nicht relativ zum Paket der Klasse.)
Bemerkung
Wenn Sie nicht erwarten, dass sich der Inhalt der Datei zwischen UDF-Aufrufen ändert, lesen Sie die Datei in einen
lazy val
-Wert. Dies bewirkt, dass der Dateiladecode nur beim ersten Aufruf der UDF und nicht bei nachfolgenden Aufrufen ausgeführt wird.
Im folgenden Beispiel wird ein Objekt (UDFCode
) mit einer Funktion definiert, die als UDF (readFileFunc
) verwendet wird. Die Funktion liest die Datei data/hello.txt
, die die Zeichenfolge hello,
enthalten soll. Die Funktion stellt diese Zeichenfolge der als Argument übergebenen Zeichenfolge voran.
// Create a function object that reads a file.
object UDFCode extends Serializable {
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
// first accessed.
lazy val prefix = {
import java.io._
val resourceName = "/data/hello.txt"
val inputStream = classOf[com.snowflake.snowpark.DataFrame]
.getResourceAsStream(resourceName)
if (inputStream == null) {
throw new Exception("Can't find file " + resourceName)
}
scala.io.Source.fromInputStream(inputStream).mkString
}
val readFileFunc = (s: String) => prefix + " : " + s
}
Im nächsten Teil des Beispiels wird die Funktion als anonyme UDF registriert. In dem Beispiel wird die UDF auf der Spalte NAME
in einem DataFrame aufgerufen. Im Beispiel wird davon ausgegangen, dass die Datei data/hello.txt
in der JAR-Datei myJar.jar
gepackt ist.
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")
// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs)¶
So erstellen und registrieren Sie eine UDTF in Snowpark:
In den nächsten Abschnitten werden diese Schritte näher beschrieben.
Weitere Informationen zum Aufrufen von UDTFs finden Sie unter Aufrufen einer UDTF.
Definieren der UDTF-Klasse¶
Definieren Sie eine Klasse, die von einer der UDTFn
-Klassen (z. B. UDTF0
oder UDTF1
) im com.snowflake.snowpark.udtf package erbt, wobei n
die Anzahl der Eingabeargumente der UDTF angibt. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt, erweitern Sie die Klasse UDTF2
.
Überschreiben Sie in Ihrer Klasse die folgenden Methoden:
outputSchema() – Gibt ein
types.StructType
-Objekt zurück, das die Namen und Typen der Felder in den zurückgegebenen Zeilen beschreibt (das „Schema“ der Ausgabe).process() – Wird für jede Zeile in der Eingabepartition einmal aufgerufen (siehe Anmerkung unten).
endPartition() – Wird einmal für jede Partition aufgerufen wird, nachdem alle Zeilen an
process()
übergeben wurden.
Wenn eine UDTF aufgerufen wird, werden die Zeilen vor Übergabe an die UDTF in Partitionen gruppiert:
Wenn die Anweisung, die die UDTF aufruft, die PARTITION-Klausel (explizite Partitionen) enthält, bestimmt diese Klausel, wie die Zeilen partitioniert werden.
Wenn die Anweisung die PARTITION-Klausel nicht enthält (implizite Partitionen), bestimmt Snowflake, wie die Zeilen am besten zu partitionieren sind.
Weitere Informationen zu Partitionen finden Sie unter Tabellenfunktionen und Partitionen.
Ein Beispiel für eine UDTF-Klasse finden Sie unter Beispiel für eine UDTF-Klasse.
Überschreiben der outputSchema()-Methode¶
Überschreiben Sie die Methode outputSchema()
, um die Namen und Datentypen der Felder (das „Ausgabeschema“) der Zeilen zu definieren, die von den Methoden process()
und endPartition()
zurückgegeben werden.
def outputSchema(): StructType
In dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das ein Array
von StructField-Objekten verwendet, um den Snowflake-Datentyp jedes Feldes in einer zurückgegebenen Zeile anzugeben. Snowflake unterstützt die folgenden Typobjekte für das Ausgabeschema einer UDTF:
SQL-Datentyp |
Scala-Typ |
|
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Wenn Ihre UDTF zum Beispiel eine Zeile mit einem einzigen Integer-Feld zurückgibt:
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Überschreiben der process()-Methode¶
Überschreiben Sie in Ihrer UDTF-Klasse die Methode process()
:
def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
wobei n
die Anzahl der Argumente ist, die an Ihre UDTF übergeben werden.
Die Anzahl der Argumente in der Signatur entspricht der Klasse, die Sie erweitert haben. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt und Sie die Klasse UDTF2
erweitern, hat die Methode process()
folgende Signatur:
def process(arg0: A0, arg1: A1): Iterable[Row]
Diese Methode wird einmal für jede Zeile der Eingabepartition aufgerufen.
Auswählen der Argumenttypen¶
Verwenden Sie für den Typ jedes Arguments der Methode process()
den Scala-Typ, der dem Snowflake-Datentyp des an die UDTF übergebenen Arguments entspricht.
Snowflake unterstützt die folgenden Datentypen für die Argumente einer UDTF:
SQL-Datentyp |
Scala-Datentyp |
Anmerkungen |
---|---|---|
Die folgenden Typen werden unterstützt:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Es werden veränderbare Maps der folgenden Typen unterstützt:
|
Zurückgeben von Zeilen¶
Erstellen Sie in der Methode process()
ein Iterable
von Row
-Objekten, die die Daten enthalten, die von der UDTF für die gegebenen Eingabewerte zurückgegeben werden sollen, und geben Sie dieses zurück. Die Felder in der Zeile müssen die Typen verwenden, die Sie in der Methode outputSchema
angegeben haben. (siehe Überschreiben der outputSchema()-Methode).
Wenn Ihre UDTF beispielsweise Zeilen generiert, konstruieren Sie ein Iterable
von Row
-Objekten für die generierten Zeilen, und geben dieses zurück:
override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_))
Überschreiben der endPartition()-Methode¶
Überschreiben Sie die Methode endPartition
, und fügen Sie Code hinzu, der ausgeführt werden soll, nachdem alle Zeilen in der Eingabepartition an die Methode process
übergeben wurden. Die Funktion endPartition
wird einmal für jede Eingabepartition aufgerufen.
def endPartition(): Iterable[Row]
Sie können diese Methode verwenden, wenn Sie Workload ausführen müssen, nachdem alle Zeilen in der Partition verarbeitet worden sind. Beispiele:
Zurückgeben von Zeilen auf der Grundlage von Zustandsinformationen, die Sie in jedem
process
-Methodenaufruf erfassen.Zurückgeben von Zeilen, die nicht an eine bestimmte Eingabezeile gebunden sind.
Zurückgeben von Zeilen, die die Ausgabezeilen zusammenfassen, die von der Methode
process
generiert wurden.
Die Felder in den zurückgegebenen Zeilen müssen den Typen entsprechen, die Sie in der Methode outputSchema
angegebenen haben. (siehe Überschreiben der outputSchema()-Methode).
Wenn Sie am Ende jeder Partition keine zusätzlichen Zeilen zurückgeben müssen, geben Sie z. B. ein leeres Iterable
von Row
-Objekten zurück:
override def endPartition(): Iterable[Row] = Array.empty[Row]
Bemerkung
Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn endPartition
zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.
Beispiel für eine UDTF-Klasse¶
Es folgt ein Beispiel für eine UDTF-Klasse, die einen Bereich von Zeilen generiert.
Da die UDTF 2 Argumente übergibt, erweitert die Klasse
UDTF2
.Die Argumente
start
undcount
geben die Startnummer für die Zeile und die Anzahl der zu generierenden Zeilen an.
class MyRangeUdtf extends UDTF2[Int, Int] {
override def process(start: Int, count: Int): Iterable[Row] =
(start until (start + count)).map(Row(_))
override def endPartition(): Iterable[Row] = Array.empty[Row]
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Registrieren der UDTF¶
Erstellen Sie dann eine Instanz der neuen Klasse, und registrieren Sie die Klasse durch Aufruf einer der UDTFRegistration-Methoden. Sie können die UDTF als temporär oder permanent registrieren.
Registrieren einer temporären UDTF¶
Um eine temporäre UDTF zu registrieren, rufen Sie UDTFRegistration.registerTemporary
auf:
Wenn Sie die UDTF ohne Namen aufrufen müssen, können Sie eine anonyme UDTF registrieren, indem Sie eine Instanz der Klasse übergeben:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show
Wenn Sie die UDTF mit einem Namen aufrufen müssen, geben Sie auch einen Namen für die UDTF an:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Registrieren einer permanenten UDTF¶
Wenn Sie die UDTF in späteren Sitzungen verwenden müssen, rufen Sie UDTFRegistration.registerPermanent
auf, um eine permanente UDTF zu registrieren.
Bei der Registrierung einer permanenten UDTF müssen Sie einen Stagingbereich angeben, in den die Registrierungsmethode die JAR-Dateien der UDTF und deren Abhängigkeiten hochlädt. Beispiel:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage") // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Aufrufen einer UDTF¶
Nach dem Registrieren der UDTF können Sie die UDTF aufrufen, indem Sie das zurückgegebene TableFunction
-Objekt an die tableFunction
-Methode des Session
-Objekts übergeben:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show()
Um eine UDTF-Methode namentlich aufzurufen, erstellen Sie ein TableFunction
-Objekt mit diesem Namen und übergeben es an die tableFunction
-Methode:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Sie können eine UDTF-Anweisung auch direkt über eine SELECT-Anweisung aufrufen:
session.sql("select * from table(myUdtf(10, 5))")