Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames in Scala

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Scala erstellen können. Unter diesem Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Einführung

Sie können Snowpark-APIs aufrufen, um benutzerdefinierte Funktionen (UDFs) für Ihre kundenspezifischen Lambdas und Funktionen in Scala zu erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihrem DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um einen UDF zu erstellen, serialisiert die Snowpark-Bibliothek den Code für Ihre UDF und lädt ihn in einen internen Stagingbereich hoch. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, auf dem sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem kundenspezifischen Code können Sie auch Code aufrufen, der in JAR-Dateien gepackt ist (z. B. Java-Klassen für die Bibliothek eines Drittanbieters).

Es gibt zwei Möglichkeiten, eine UDF für Ihren kundenspezifischen Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

Die folgenden Abschnitte enthalten wichtige Informationen zur Erstellung von UDFs in Snowpark:

Im restlichen Teil dieses Themas wird erläutert, wie UDFs erstellt werden.

Bemerkung

Wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, können Sie diese UDF in Snowpark aufrufen.

Weitere Details dazu finden Sie unter Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs).

Unterstützte Datentypen für Argumente und Rückgabewerte

Um eine UDF für eine Scala-Funktion oder ein Lambda zu erstellen, müssen Sie die unten aufgeführten unterstützten Datentypen für die Argumente und den Rückgabewert Ihrer Funktion oder Ihres Lambdas verwenden:

SQL-Datentyp

Scala-Datentyp

Anmerkungen

NUMBER

Die folgenden Typen werden unterstützt:

  • Short oder Option[Short]

  • Int oder Option[Int]

  • Long oder Option[Long]

  • java.math.BigDecimal

FLOAT

Float oder Option[Float]

DOUBLE

Double oder Option[Double]

VARCHAR

String oder java.lang.String

BOOLEAN

Boolean oder Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] oder Array[Variant]

OBJECT

Map[String, String] oder Map[String, Variant]

Es werden veränderbare Maps der folgenden Typen unterstützt:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

com.snowflake.snowpark.types.Geography

Einschränkungen beim Erstellen von UDFs in einem Objekt mit der Eigenschaft „App Trait“

Scala bietet eine App-Eigenschaft, die Sie erweitern können, um Ihr Scala-Objekt in ein ausführbares Programm umzuwandeln. Die App-Eigenschaft bietet eine main-Methode, die automatisch den gesamten Codetext Ihrer Objektdefinition ausführt. (Der Code in Ihrer Objektdefinition wird faktisch zur main-Methode).

Eine mögliche Auswirkung der Erweiterung der App-Eigenschaft besteht darin, dass die Felder in Ihrem Objekt erst bei Aufruf der main-Methode initialisiert werden. Wenn Ihr Objekt App erweitert und Sie eine UDF definieren, die ein Objektfeld verwendet, das Sie zuvor initialisiert haben, enthält die auf den Server hochgeladene UDF-Definition nicht den initialisierten Wert des Objektfelds.

Angenommen, Sie haben ein Feld mit dem Namen myConst im Objekt definiert und initialisiert und verwenden dieses Feld in einer UDF:

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

Wenn Snowpark die UDF-Definition serialisiert und zu Snowflake hochlädt, wird myConst nicht initialisiert und daher zu null aufgelöst. Als Ergebnis gibt der UDF-Aufruf null für myConst zurück.

Um dies zu umgehen, müssen Sie Ihr Objekt so ändern, dass es die App-Eigenschaft nicht erweitert. Implementieren Sie dazu eine separate main-Methode für Ihren Code:

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.addDependency() für alle Dateien aufrufen, die Klassen und Ressourcen enthalten, von denen Ihre UDF abhängt (z. B. JAR-Dateien, Ressourcendateien usw.). (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien aus einer UDF.)

Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und fügt die Dateien beim Ausführen Ihrer UDF zum Klassenpfad hinzu.

Tipp

Wenn Sie nicht möchten, dass die Bibliothek die Datei jedes Mal hochlädt, wenn Sie Ihre Anwendung ausführen, laden Sie die Datei in einen Stagingbereich hoch. Übergeben Sie beim Aufruf von addDependency den Pfad zu der Datei im Stagingbereich.

Wenn Sie die Scala REPL verwenden, müssen Sie das Verzeichnis der von der REPL generierten Klassen als Abhängigkeit hinzufügen. Wenn Sie z. B. das Skript run.sh zum Starten von REPL verwendet haben, rufen Sie die folgende Methode auf, die das vom Skript erstellte Verzeichnis repl_classes hinzufügt:

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

Das folgende Beispiel zeigt, wie Sie eine JAR-Datei als Abhängigkeit zu einem Stagingbereich hinzufügen:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

Die folgenden Beispiele zeigen, wie Sie Abhängigkeiten für JAR-Dateien und Ressourcendateien hinzufügen:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:

  • Ihre Scala-Laufzeitbibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

  • Die Snowpark-JAR-Datei.

    Die Snowpark-Bibliothek versucht automatisch, die Snowpark-JAR-Datei zu erkennen und auf den Server hochzuladen.

    So verhindern Sie, dass die Bibliothek die Snowpark-JAR-Datei wiederholt auf den Server hochlädt:

    1. Laden Sie die Snowpark-JAR-Datei in einen Stagingbereich hoch.

      Der folgende Befehl lädt z. B. die Snowpark-JAR-Datei in den Stagingbereich @mystage hoch. Der PUT-Befehl komprimiert die JAR-Datei und gibt der resultierenden Datei die Bezeichnung „snowpark-1.9.0.jar.gz“.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Rufen Sie addDependency auf, um die Snowpark-JAR-Datei als Abhängigkeit zum Stagingbereich hinzuzufügen.

      Um beispielsweise die mit dem vorherigen Befehl hochgeladene Snowpark-JAR-Datei hinzuzufügen:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz")

      Beachten Sie, dass der angegebene Pfad zur JAR-Datei die Dateinamenerweiterung .gz enthält, die durch den PUT-Befehl hinzugefügt wurde.

  • Die JAR-Datei oder das Verzeichnis mit der aktuell ausgeführten Anwendung.

    Die Snowpark-Bibliothek versucht automatisch, diese Abhängigkeiten zu erkennen und hochzuladen.

    Wenn die Snowpark-Bibliothek diese Abhängigkeiten nicht automatisch erkennen kann, meldet die Bibliothek einen Fehler, und Sie müssen addDependency aufrufen, um die Abhängigkeiten manuell hinzuzufügen.

Wenn das Hochladen der Abhängigkeiten in den Stagingbereich zu lange dauert, meldet die Snowpark-Bibliothek eine Timeout-Ausnahme. Um die maximale Zeitspanne zu konfigurieren, die die Snowpark-Bibliothek warten soll, legen Sie Erstellen der Sitzung den gewünschten Wert für die Eigenschaft snowpark_request_timeout_in_seconds fest.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion udf im Objekt com.snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode registerTemporary in der Klasse UDFRegistration auf, und übergeben Sie die Definition der anonymen Funktion. Da Sie einen anonymen UDF registrieren, müssen Sie die Methodensignaturen verwenden, die keinen name-Parameter haben.

Bemerkung

Wenn Sie Code mit mehreren Threads schreiben (z. B. bei Verwendung paralleler Collections), können Sie UDFs mit der Methode registerTemporary registrieren, anstatt die Funktion udf zu verwenden. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Diese Methoden geben ein UserDefinedFunction-Objekt zurück, das Sie zum Aufrufen der UDF verwenden können (siehe Aufrufen von skalaren benutzerdefinierten Funktionen (UDFs)).

Im folgenden Beispiel wird eine anonyme UDF erstellt:

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

Bemerkung

Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter Notebook für Snowpark Scala) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).

Im folgenden Beispiel wird eine anonyme UDF erstellt, der ein Array von String-Werten übergeben und die Zeichenfolge x an jeden Wert anhängt wird:

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

Im folgenden Beispiel wird eine anonyme UDF erstellt, die eine kundenspezifische Klasse verwendet (LanguageDetector zum Erkennen der in Text verwendeten Sprache). Im Beispiel wird die anonyme UDF aufgerufen, um die Sprache in der Spalte text_data eines DataFrame zu erkennen und ein neues DataFrame zu erstellen, das eine zusätzliche Spalte lang mit der verwendeten Sprache enthält.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF mit ihrem Namen aufrufen möchten (z. B. durch Verwendung der Funktion callUDF im Objekt functions) oder wenn Sie einen UDF in nachfolgenden Sitzungen verwenden müssen, können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie dazu eine der folgenden Methoden in der UDFRegistration-Klasse:

  • registerTemporary, wenn Sie die UDF nur in der aktuellen Sitzung verwenden möchten

  • registerPermanent, wenn Sie planen, die UDF in nachfolgenden Sitzungen zu verwenden

Um auf ein Objekt der Klasse UDFRegistration zuzugreifen, rufen Sie die Methode udf der Klasse Session auf.

registerTemporary erstellt eine temporäre UDF, die Sie in der aktuellen Sitzung verwenden können.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent erstellt eine UDF, die Sie in der aktuellen und den folgenden Sitzungen verwenden können. Wenn Sie registerPermanent aufrufen, müssen Sie auch einen Speicherort in einem internen Stagingbereich angeben, in den die JAR-Dateien für die UDF und seine Abhängigkeiten hochgeladen werden sollen.

Bemerkung

registerPermanent unterstützt keine externen Stagingbereiche.

Beispiel:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

Bemerkung

Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter Notebook für Snowpark Scala) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).

Erstellen von UDFs in Jupyter Notebooks

Wenn Sie UDFs in einem Jupyter-Notebook erstellen, müssen Sie die folgenden zusätzlichen Schritte ausführen:

Schreiben der Implementierung einer UDF

Definieren Sie die Implementierung Ihrer Funktion in einer Klasse, die Serializable erweitert. Beispiel:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

Zugriff auf eine Variable, die in einer anderen Zelle definiert ist

Wenn Sie eine in einer anderen Zelle definierte Variable in Ihrer UDF verwenden müssen, müssen Sie die Variable als Argument an den Klassenkonstruktor übergeben. Nehmen Sie zum Beispiel an, dass Sie in Zelle 1 eine Variable definiert haben:

In [1]:
Copy
val prefix = "Hello"
Copy

und Sie möchten diese Variable in einer UDF verwenden, die Sie in Zelle 2 definiert haben. Fügen Sie im Klassenkonstruktor der UDF ein Argument für diese Variable hinzu. Wenn Sie dann den Klassenkonstruktor zum Erstellen von UDF aufrufen, übergeben Sie die in Zelle 1 definierte Variable:

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

Verwenden von Objekten, die nicht serialisierbar sind

Wenn Sie eine UDF für ein Lambda oder eine Funktion ausführen, serialisiert die Snowpark-Bibliothek die Lambda-Closure und sendet sie zur Ausführung an den Server.

Wenn ein von der Lambda-Closure erfasstes Objekt nicht serialisierbar ist, löst die Snowpark-Bibliothek eine java.io.NotSerializableException-Ausnahme aus.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Wenn dies der Fall ist, haben Sie folgende Optionen:

  • Sie können das Objekt serialisierbar machen oder

  • Sie können das Objekt als lazy val deklarieren, oder verwenden Sie die @transient-Anmerkung, um die Serialisierung des Objekts zu vermeiden.

    Beispiel:

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

Schreiben von Initialisierungscode für eine UDF

Wenn Ihre UDF Initialisierungscode oder Kontext benötigt, können Sie diesen durch Werte bereitstellen, die als Teil der UDF-Closure erfasst werden.

Im folgenden Beispiel wird eine separate Klasse verwendet, um den von drei UDFs benötigten Kontext zu initialisieren.

  • Die erste UDF erstellt eine neue Instanz der Klasse innerhalb der Lambda-Funktion, sodass die Initialisierung bei jedem Aufruf der UDF durchgeführt wird.

  • Die zweite UDF erfasst eine Instanz der Klasse, die in Ihrem Clientprogramm generiert wurde. Der auf dem Client generierte Kontext wird serialisiert und von der UDF verwendet. Beachten Sie, dass die Kontextklasse serialisierbar sein muss, damit dieser Ansatz funktioniert.

  • Die dritte UDF erfasst einen lazy val, sodass der Kontext beim ersten UDF-Aufruf im Lazy-Modus instanziiert und bei nachfolgenden Aufrufen wiederverwendet wird. Dieser Ansatz funktioniert auch, wenn der Kontext nicht serialisierbar ist. Es gibt jedoch keine Garantie dafür, dass ALL ALLE UDF -Aufrufe innerhalb eines DataFrames denselben, im Lazy-Modus generierten Kontext verwenden.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

Lesen von Dateien aus einer UDF

Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF-Daten aus einer Datei gelesen werden, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

Wenn außerdem der Inhalt der Datei zwischen den Aufrufen der UDF gleich bleibt, können Sie Ihren Code so schreiben, dass die Datei einmal beim ersten Aufruf geladen wird und bei den folgenden Aufrufen nicht mehr. Dies kann die Leistung Ihrer UDF-Aufrufe verbessern.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Fügen Sie die Datei zu einer JAR-Datei hinzu.

    Wenn Ihre UDF z. B. eine Datei in einem data/-Unterverzeichnis (data/hello.txt) verwenden muss, führen Sie den Befehl jar aus, um diese Datei zu einer JAR-Datei hinzuzufügen:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Geben Sie an, dass die JAR-Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen und dem Klassenpfad hinzugefügt wird. Siehe Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. Rufen Sie in der UDF Class.getResourceAsStream auf, um die Datei im Klassenpfad zu finden und die Datei zu lesen.

    Um das Hinzufügen einer Abhängigkeit von this zu vermeiden, können Sie classOf[com.snowflake.snowpark.DataFrame] (anstelle von getClass) verwenden, um das Objekt Class zu erhalten.

    Beispiel für das Lesen der Datei data/hello.txt:

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    In diesem Beispiel beginnt der Ressourcenname mit einem /, was anzeigt, dass dies der vollständige Pfad der Datei in der JAR-Datei ist. (In diesem Fall ist der Speicherort der Datei nicht relativ zum Paket der Klasse.)

Bemerkung

Wenn Sie nicht erwarten, dass sich der Inhalt der Datei zwischen UDF-Aufrufen ändert, lesen Sie die Datei in einen lazy val-Wert. Dies bewirkt, dass der Dateiladecode nur beim ersten Aufruf der UDF und nicht bei nachfolgenden Aufrufen ausgeführt wird.

Im folgenden Beispiel wird ein Objekt (UDFCode) mit einer Funktion definiert, die als UDF (readFileFunc) verwendet wird. Die Funktion liest die Datei data/hello.txt, die die Zeichenfolge hello, enthalten soll. Die Funktion stellt diese Zeichenfolge der als Argument übergebenen Zeichenfolge voran.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

Im nächsten Teil des Beispiels wird die Funktion als anonyme UDF registriert. In dem Beispiel wird die UDF auf der Spalte NAME in einem DataFrame aufgerufen. Im Beispiel wird davon ausgegangen, dass die Datei data/hello.txt in der JAR-Datei myJar.jar gepackt ist.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

Erstellen von benutzerdefinierten Tabellenfunktionen (UDTFs)

So erstellen und registrieren Sie eine UDTF in Snowpark:

In den nächsten Abschnitten werden diese Schritte näher beschrieben.

Weitere Informationen zum Aufrufen von UDTFs finden Sie unter Aufrufen einer UDTF.

Definieren der UDTF-Klasse

Definieren Sie eine Klasse, die von einer der UDTFn-Klassen (z. B. UDTF0 oder UDTF1) im com.snowflake.snowpark.udtf package erbt, wobei n die Anzahl der Eingabeargumente der UDTF angibt. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt, erweitern Sie die Klasse UDTF2.

Überschreiben Sie in Ihrer Klasse die folgenden Methoden:

  • outputSchema() – Gibt ein types.StructType-Objekt zurück, das die Namen und Typen der Felder in den zurückgegebenen Zeilen beschreibt (das „Schema“ der Ausgabe).

  • process() – Wird für jede Zeile in der Eingabepartition einmal aufgerufen (siehe Anmerkung unten).

  • endPartition() – Wird einmal für jede Partition aufgerufen wird, nachdem alle Zeilen an process() übergeben wurden.

Wenn eine UDTF aufgerufen wird, werden die Zeilen vor Übergabe an die UDTF in Partitionen gruppiert:

  • Wenn die Anweisung, die die UDTF aufruft, die PARTITION-Klausel (explizite Partitionen) enthält, bestimmt diese Klausel, wie die Zeilen partitioniert werden.

  • Wenn die Anweisung die PARTITION-Klausel nicht enthält (implizite Partitionen), bestimmt Snowflake, wie die Zeilen am besten zu partitionieren sind.

Weitere Informationen zu Partitionen finden Sie unter Tabellenfunktionen und Partitionen.

Ein Beispiel für eine UDTF-Klasse finden Sie unter Beispiel für eine UDTF-Klasse.

Überschreiben der outputSchema()-Methode

Überschreiben Sie die Methode outputSchema(), um die Namen und Datentypen der Felder (das „Ausgabeschema“) der Zeilen zu definieren, die von den Methoden process() und endPartition() zurückgegeben werden.

def outputSchema(): StructType
Copy

In dieser Methode wird ein StructType-Objekt konstruiert und zurückgegeben, das ein Array von StructField-Objekten verwendet, um den Snowflake-Datentyp jedes Feldes in einer zurückgegebenen Zeile anzugeben. Snowflake unterstützt die folgenden Typobjekte für das Ausgabeschema einer UDTF:

SQL-Datentyp

Scala-Typ

com.snowflake.snowpark.types-Typ

NUMBER

Short oder Option[Short]

ShortType

NUMBER

Int oder Option[Int]

IntType

NUMBER

Long oder Option[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

Float oder Option[Float]

FloatType

DOUBLE

Double oder Option[Double]

DoubleType

VARCHAR

String oder java.lang.String

StringType

BOOLEAN

Boolean oder Option[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

com.snowflake.snowpark.types.Variant

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

Wenn Ihre UDTF zum Beispiel eine Zeile mit einem einzigen Integer-Feld zurückgibt:

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

Überschreiben der process()-Methode

Überschreiben Sie in Ihrer UDTF-Klasse die Methode process():

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

wobei n die Anzahl der Argumente ist, die an Ihre UDTF übergeben werden.

Die Anzahl der Argumente in der Signatur entspricht der Klasse, die Sie erweitert haben. Wenn Ihre UDTF zum Beispiel 2 Eingabeargumente übergibt und Sie die Klasse UDTF2 erweitern, hat die Methode process() folgende Signatur:

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

Diese Methode wird einmal für jede Zeile der Eingabepartition aufgerufen.

Auswählen der Argumenttypen

Verwenden Sie für den Typ jedes Arguments der Methode process() den Scala-Typ, der dem Snowflake-Datentyp des an die UDTF übergebenen Arguments entspricht.

Snowflake unterstützt die folgenden Datentypen für die Argumente einer UDTF:

SQL-Datentyp

Scala-Datentyp

Anmerkungen

NUMBER

Die folgenden Typen werden unterstützt:

  • Short oder Option[Short]

  • Int oder Option[Int]

  • Long oder Option[Long]

  • java.math.BigDecimal

FLOAT

Float oder Option[Float]

DOUBLE

Double oder Option[Double]

VARCHAR

String oder java.lang.String

BOOLEAN

Boolean oder Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] oder Array[Variant]

OBJECT

Map[String, String] oder Map[String, Variant]

Es werden veränderbare Maps der folgenden Typen unterstützt:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

Zurückgeben von Zeilen

Erstellen Sie in der Methode process() ein Iterable von Row-Objekten, die die Daten enthalten, die von der UDTF für die gegebenen Eingabewerte zurückgegeben werden sollen, und geben Sie dieses zurück. Die Felder in der Zeile müssen die Typen verwenden, die Sie in der Methode outputSchema angegeben haben. (siehe Überschreiben der outputSchema()-Methode).

Wenn Ihre UDTF beispielsweise Zeilen generiert, konstruieren Sie ein Iterable von Row-Objekten für die generierten Zeilen, und geben dieses zurück:

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

Überschreiben der endPartition()-Methode

Überschreiben Sie die Methode endPartition, und fügen Sie Code hinzu, der ausgeführt werden soll, nachdem alle Zeilen in der Eingabepartition an die Methode process übergeben wurden. Die Funktion endPartition wird einmal für jede Eingabepartition aufgerufen.

def endPartition(): Iterable[Row]
Copy

Sie können diese Methode verwenden, wenn Sie Workload ausführen müssen, nachdem alle Zeilen in der Partition verarbeitet worden sind. Beispiele:

  • Zurückgeben von Zeilen auf der Grundlage von Zustandsinformationen, die Sie in jedem process-Methodenaufruf erfassen.

  • Zurückgeben von Zeilen, die nicht an eine bestimmte Eingabezeile gebunden sind.

  • Zurückgeben von Zeilen, die die Ausgabezeilen zusammenfassen, die von der Methode process generiert wurden.

Die Felder in den zurückgegebenen Zeilen müssen den Typen entsprechen, die Sie in der Methode outputSchema angegebenen haben. (siehe Überschreiben der outputSchema()-Methode).

Wenn Sie am Ende jeder Partition keine zusätzlichen Zeilen zurückgeben müssen, geben Sie z. B. ein leeres Iterable von Row-Objekten zurück:

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

Bemerkung

Snowflake unterstützt zwar große Partitionen mit Timeouts, die so eingestellt sind, dass sie erfolgreich verarbeitet werden können, aber bei besonders großen Partitionen kann es zu Zeitüberschreitungen kommen (z. B. wenn endPartition zu lange für den Abschluss braucht). Wenden Sie sich an den Snowflake-Support, wenn Sie den Timeout-Schwellenwert für bestimmte Nutzungsszenarios anpassen möchten.

Beispiel für eine UDTF-Klasse

Es folgt ein Beispiel für eine UDTF-Klasse, die einen Bereich von Zeilen generiert.

  • Da die UDTF 2 Argumente übergibt, erweitert die Klasse UDTF2.

  • Die Argumente start und count geben die Startnummer für die Zeile und die Anzahl der zu generierenden Zeilen an.

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

Registrieren der UDTF

Erstellen Sie dann eine Instanz der neuen Klasse, und registrieren Sie die Klasse durch Aufruf einer der UDTFRegistration-Methoden. Sie können die UDTF als temporär oder permanent registrieren.

Registrieren einer temporären UDTF

Um eine temporäre UDTF zu registrieren, rufen Sie UDTFRegistration.registerTemporary auf:

  • Wenn Sie die UDTF ohne Namen aufrufen müssen, können Sie eine anonyme UDTF registrieren, indem Sie eine Instanz der Klasse übergeben:

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • Wenn Sie die UDTF mit einem Namen aufrufen müssen, geben Sie auch einen Namen für die UDTF an:

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

Registrieren einer permanenten UDTF

Wenn Sie die UDTF in späteren Sitzungen verwenden müssen, rufen Sie UDTFRegistration.registerPermanent auf, um eine permanente UDTF zu registrieren.

Bei der Registrierung einer permanenten UDTF müssen Sie einen Stagingbereich angeben, in den die Registrierungsmethode die JAR-Dateien der UDTF und deren Abhängigkeiten hochlädt. Beispiel:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Aufrufen einer UDTF

Nach dem Registrieren der UDTF können Sie die UDTF aufrufen, indem Sie das zurückgegebene TableFunction-Objekt an die tableFunction-Methode des Session-Objekts übergeben:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

Um eine UDTF-Methode namentlich aufzurufen, erstellen Sie ein TableFunction-Objekt mit diesem Namen und übergeben es an die tableFunction-Methode:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Sie können eine UDTF-Anweisung auch direkt über eine SELECT-Anweisung aufrufen:

session.sql("select * from table(myUdtf(10, 5))")
Copy