Erstellen von benutzerdefinierten Funktionen (UDFs) für DataFrames

Die Snowpark-API stellt Methoden zur Verfügung, mit denen Sie eine benutzerdefinierte Funktion aus einer Lambda-Funktion oder einer Funktion in Scala erstellen können. Unter diese Thema wird erklärt, wie diese Typen von Funktionen erstellt werden.

Unter diesem Thema:

Mit Snowpark können Sie benutzerdefinierte Funktionen (UDFs) für Ihre benutzerdefinierten Lambdas und Funktionen erstellen, und Sie können diese UDFs aufrufen, um die Daten in Ihren DataFrame zu verarbeiten.

Wenn Sie die Snowpark-API verwenden, um einen UDF zu erstellen, lädt die Snowpark-Bibliothek den Code Ihrer Funktion in einen internen Stagingbereich. Wenn Sie die UDF aufrufen, führt die Snowpark-Bibliothek Ihre Funktion auf dem Server aus, wo sich die Daten befinden. Dadurch müssen die Daten nicht an den Client übertragen werden, damit die Funktion die Daten verarbeiten kann.

In Ihrem benutzerdefinierten Code können Sie auch Code aufrufen, der in JAR-Dateien gepackt ist (z. B. Java-Klassen für die Bibliothek eines Drittanbieters).

Es gibt zwei Möglichkeiten, eine UDF für Ihren benutzerdefinierten Code zu erstellen:

  • Sie können eine anonyme UDF erstellen und die Funktion einer Variablen zuweisen. Solange sich die Variable im Gültigkeitsbereich befindet, können Sie sie zum Aufrufen der UDF verwenden.

  • Sie können eine benannte UDF erstellen und die UDF bei ihrem Namen aufrufen. Sie können diese Variante verwenden, wenn Sie z. B. eine UDF namentlich aufrufen müssen oder wenn die UDF in einer nachfolgenden Sitzung verwendet wird.

In den nächsten Abschnitten wird erklärt, wie Sie diese UDFs erstellen.

Beachten Sie, dass Sie, wenn Sie eine UDF durch Ausführen des Befehls CREATE FUNCTION definiert haben, diese UDF in Snowpark aufrufen können. Weitere Details dazu finden Sie unter Benutzerdefinierte Funktionen (UDFs).

Einschränkungen beim Erstellen von UDFs in einem Objekt mit der Eigenschaft „App Trait“

Scala bietet eine App-Eigenschaft, die Sie erweitern können, um Ihr Scala-Objekt in ein ausführbares Programm umzuwandeln. Die App-Eigenschaft bietet eine main-Methode, die automatisch den gesamten Codetext Ihrer Objektdefinition ausführt. (Der Code in Ihrer Objektdefinition wird faktisch zur main-Methode).

Eine mögliche Auswirkung der Erweiterung der App-Eigenschaft besteht darin, dass die Felder in Ihrem Objekt erst bei Aufruf der main-Methode initialisiert werden. Wenn Ihr Objekt App erweitert und Sie eine UDF definieren, die ein Objektfeld verwendet, das Sie zuvor initialisiert haben, enthält die auf den Server hochgeladene UDF-Definition nicht den initialisierten Wert des Objektfelds.

Angenommen, Sie haben ein Feld mit dem Namen myConst im Objekt definiert und initialisiert und verwenden dieses Feld in einer UDF:

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}

Wenn Snowpark die UDF-Definition serialisiert und zu Snowflake hochlädt, wird myConst nicht initialisiert und daher zu null aufgelöst. Als Ergebnis gibt der UDF-Aufruf null für myConst zurück.

Um dies zu umgehen, müssen Sie Ihr Objekt so ändern, dass es die App-Eigenschaft nicht erweitert. Implementieren Sie dazu eine separate main-Methode für Ihren Code:

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}

Angeben von Abhängigkeiten für eine UDF

Um eine UDF über die Snowpark-API zu definieren, müssen Sie Session.addDependency() für alle Dateien aufrufen, die Klassen und Ressourcen enthalten, von denen Ihre UDF abhängt (z. B. JAR-Dateien, Ressourcendateien usw.). (Weitere Details zu lesenden Ressourcen aus einer UDF finden Sie unter Lesen von Dateien aus einer UDF.)

Die Snowpark-Bibliothek lädt diese Dateien in einen internen Stagingbereich hoch und fügt die Dateien beim Ausführen Ihrer UDF zum Klassenpfad hinzu.

Tipp

Wenn Sie nicht möchten, dass die Bibliothek die Datei jedes Mal hochlädt, wenn Sie Ihre Anwendung ausführen, laden Sie die Datei in einen Stagingbereich hoch. Übergeben Sie beim Aufruf von addDependency den Pfad zu der Datei im Stagingbereich.

Wenn Sie die Scala REPL verwenden, müssen Sie das Verzeichnis der von der REPL generierten Klassen als Abhängigkeit hinzufügen. Wenn Sie z. B. das Skript run.sh zum Starten von REPL verwendet haben, rufen Sie die folgende Methode auf, die das vom Skript erstellte Verzeichnis repl_classes hinzufügt:

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")

Das folgende Beispiel zeigt, wie Sie eine JAR-Datei als Abhängigkeit zu einem Stagingbereich hinzufügen:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")

Die folgenden Beispiele zeigen, wie Sie Abhängigkeiten für JAR-Dateien und Ressourcendateien hinzufügen:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")

Die folgenden Abhängigkeiten sollten Sie nicht angeben müssen:

  • Ihre Scala-Laufzeitbibliotheken.

    Diese Bibliotheken sind bereits in der Laufzeitumgebung auf dem Server verfügbar, auf dem Ihre UDFs ausgeführt werden.

  • Die Snowpark-JAR-Datei.

    Die Snowpark-Bibliothek versucht automatisch, die Snowpark-JAR-Datei zu erkennen und auf den Server hochzuladen.

    So verhindern Sie, dass die Bibliothek die Snowpark-JAR-Datei wiederholt auf den Server hochlädt:

    1. Laden Sie die Snowpark-JAR-Datei in einen Stagingbereich hoch.

      Der folgende Befehl lädt z. B. die Snowpark-JAR-Datei in den Stagingbereich @mystage hoch. Der PUT-Befehl komprimiert die JAR-Datei und benennt die resultierende Datei snowpark-0.5.0.jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-0.5.0.jar @mystage
      
    2. Rufen Sie addDependency auf, um die Snowpark-JAR-Datei als Abhängigkeit zum Stagingbereich hinzuzufügen.

      Um beispielsweise die mit dem vorherigen Befehl hochgeladene Snowpark-JAR-Datei hinzuzufügen:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-0.5.0.jar.gz")
      

      Beachten Sie, dass der angegebene Pfad zur JAR-Datei die Dateinamenerweiterung .gz enthält, die durch den PUT-Befehl hinzugefügt wurde.

  • Die JAR-Datei oder das Verzeichnis mit der aktuell ausgeführten Anwendung.

    Die Snowpark-Bibliothek versucht automatisch, diese Abhängigkeiten zu erkennen und hochzuladen.

    Wenn die Snowpark-Bibliothek diese Abhängigkeiten nicht automatisch erkennen kann, meldet die Bibliothek einen Fehler, und Sie müssen addDependency aufrufen, um die Abhängigkeiten manuell hinzuzufügen.

Erstellen einer anonymen UDF

Für das Erstellen einer anonymen UDF gibt es zwei Möglichkeiten:

  • Rufen Sie die Funktion udf im Objekt com.snowflake.snowpark.functions auf, und übergeben Sie die Definition der anonymen Funktion.

  • Rufen Sie die Methode registerTemporary in der Klasse UDFRegistration auf, und übergeben Sie die Definition der anonymen Funktion. Da Sie einen anonymen UDF registrieren, müssen Sie die Methodensignaturen verwenden, die keinen name-Parameter haben.

Bemerkung

Wenn Sie Code mit mehreren Threads schreiben (z. B. bei Verwendung paralleler Collections), können Sie UDFs mit der Methode registerTemporary registrieren, anstatt die Funktion udf zu verwenden. Dadurch können Fehler vermieden werden, bei denen das Snowflake-Standardobjekt Session nicht gefunden werden kann.

Im folgenden Beispiel wird eine anonyme UDF erstellt, die eine benutzerdefinierte Klasse verwendet (LanguageDetector zum Erkennen der in Text verwendeten Sprache). Im Beispiel wird die anonyme UDF aufgerufen, um die Sprache in der Spalte text_data eines DataFrame zu erkennen und ein neues DataFrame zu erstellen, das eine zusätzliche Spalte lang mit der verwendeten Sprache enthält.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))

Bemerkung

Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter-Notebooks für Snowpark) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).

Wenn Sie Ihren Clientcode ausführen, serialisiert die Snowpark-Bibliothek die Lambda-Closure und sendet sie zur Ausführung an den Server. Wenn ein von der Lambda-Closure erfasstes Objekt nicht serialisierbar ist, können Sie Folgendes tun:

  • Machen Sie das Objekt serialisierbar oder

  • Deklarieren Sie das Objekt als lazy val, oder verwenden Sie die @transient-Anmerkung, um die Serialisierung des Objekts zu vermeiden.

Beispiel:

// Declare the detector object as lazy.
lazy val detector = new LanguageDetector("en")
// The detector object is not serialized but is instead reconstructed on the server.
val langUdf = udf(...)
val dfEmailsWithLangCol = ...

Erstellen und Registrieren einer benannten UDF

Wenn Sie eine UDF mit ihrem Namen aufrufen möchten (z. B. durch Verwendung der Funktion callUDF im Objekt functions) oder wenn Sie einen UDF in nachfolgenden Sitzungen verwenden müssen, können Sie eine benannte UDF erstellen und registrieren. Verwenden Sie dazu eine der folgenden Methoden in der UDFRegistration-Klasse:

  • registerTemporary, wenn Sie die UDF nur in der aktuellen Sitzung verwenden möchten

  • registerPermanent, wenn Sie planen, die UDF in nachfolgenden Sitzungen zu verwenden

Um auf ein Objekt der Klasse UDFRegistration zuzugreifen, rufen Sie die Methode udf der Klasse Session auf.

registerTemporary erstellt eine temporäre UDF, die Sie in der aktuellen Sitzung verwenden können.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

registerPermanent erstellt eine UDF, die Sie in der aktuellen und den folgenden Sitzungen verwenden können. Wenn Sie registerPermanent aufrufen, müssen Sie auch einen Speicherort in einem internen Stagingbereich angeben, in den die JAR-Dateien für die UDF und seine Abhängigkeiten hochgeladen werden sollen.

Bemerkung

registerPermanent unterstützt keine externen Stagingbereiche.

Beispiel:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

Bemerkung

Wenn Sie eine UDF in einem Jupyter-Notebook erstellen, müssen Sie das Notebook für die Verwendung mit Snowpark einrichten (siehe Einrichten eines Jupyter-Notebooks für Snowpark) und die Richtlinien für das Schreiben von UDFs in ein Notebook befolgen (siehe Erstellen von UDFs in Jupyter Notebooks).

Erstellen von UDFs in Jupyter Notebooks

Wenn Sie UDFs in einem Jupyter-Notebook erstellen, müssen Sie die folgenden zusätzlichen Schritte ausführen:

Schreiben der Implementierung einer UDF

Definieren Sie die Implementierung Ihrer Funktion in einer Klasse, die Serializable erweitert. Beispiel:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)

Zugriff auf eine Variable, die in einer anderen Zelle definiert ist

Wenn Sie eine in einer anderen Zelle definierte Variable in Ihrer UDF verwenden müssen, müssen Sie die Variable als Argument an den Klassenkonstruktor übergeben. Nehmen Sie zum Beispiel an, dass Sie in Zelle 1 eine Variable definiert haben:

In [1]:
val prefix = "Hello"

und Sie möchten diese Variable in einer UDF verwenden, die Sie in Zelle 2 definiert haben. Fügen Sie im Klassenkonstruktor der UDF ein Argument für diese Variable hinzu. Wenn Sie dann den Klassenkonstruktor zum Erstellen von UDF aufrufen, übergeben Sie die in Zelle 1 definierte Variable:

In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()

Schreiben von Initialisierungscode für eine UDF

Wenn Ihre UDF Initialisierungscode oder Kontext benötigt, können Sie diesen durch Werte bereitstellen, die als Teil der UDF-Closure erfasst werden.

Im folgenden Beispiel wird eine separate Klasse verwendet, um den von drei UDFs benötigten Kontext zu initialisieren.

  • Die erste UDF erstellt eine neue Instanz der Klasse innerhalb der Lambda-Funktion, sodass die Initialisierung bei jedem Aufruf der UDF durchgeführt wird.

  • Die zweite UDF erfasst eine Instanz der Klasse, die in Ihrem Clientprogramm generiert wurde. Der auf dem Client generierte Kontext wird serialisiert und von der UDF verwendet. Beachten Sie, dass die Kontextklasse serialisierbar sein muss, damit dieser Ansatz funktioniert.

  • Die dritte UDF erfasst einen lazy val, sodass der Kontext beim ersten UDF-Aufruf im Lazy-Modus instanziiert und bei nachfolgenden Aufrufen wiederverwendet wird. Dieser Ansatz funktioniert auch, wenn der Kontext nicht serialisierbar ist. Es gibt jedoch keine Garantie dafür, dass ALL ALLE UDF -Aufrufe innerhalb eines DataFrames denselben, im Lazy-Modus generierten Kontext verwenden.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}

Lesen von Dateien aus einer UDF

Wie bereits erwähnt, lädt die Snowpark-Bibliothek die UDFs hoch und führt sie auf dem Server aus. Wenn Ihre UDF-Daten aus einer Datei gelesen werden, müssen Sie sicherstellen, dass die Datei mit der UDF hochgeladen wird.

Wenn außerdem der Inhalt der Datei zwischen den Aufrufen der UDF gleich bleibt, können Sie Ihren Code so schreiben, dass die Datei einmal beim ersten Aufruf geladen wird und bei den folgenden Aufrufen nicht mehr. Dies kann die Leistung Ihrer UDF-Aufrufe verbessern.

So richten Sie eine UDF zum Lesen einer Datei ein:

  1. Fügen Sie die Datei zu einer JAR-Datei hinzu.

    Wenn Ihre UDF z. B. eine Datei in einem data/-Unterverzeichnis (data/hello.txt) verwenden muss, führen Sie den Befehl jar aus, um diese Datei zu einer JAR-Datei hinzuzufügen:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
  2. Geben Sie an, dass die JAR-Datei eine Abhängigkeit ist, wodurch die Datei auf den Server hochgeladen und dem Klassenpfad hinzugefügt wird. Siehe Angeben von Abhängigkeiten für eine UDF.

    Beispiel:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
  3. Rufen Sie in der UDF Class.getResourceAsStream auf, um die Datei im Klassenpfad zu finden und die Datei zu lesen.

    Um eine Abhängigkeit von this. zu vermeiden, können Sie classOf[com.snowflake.snowpark.DataFrame] (anstelle von getClass) verwenden, um das Objekt Class zu erhalten.

    Beispiel für das Lesen der Datei data/hello.txt:

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    

    In diesem Beispiel beginnt der Ressourcenname mit einem /, was anzeigt, dass dies der vollständige Pfad der Datei in der JAR-Datei ist. (In diesem Fall ist der Speicherort der Datei nicht relativ zum Paket der Klasse.)

Bemerkung

Wenn Sie nicht erwarten, dass sich der Inhalt der Datei zwischen UDF-Aufrufen ändert, lesen Sie die Datei in einen lazy val-Wert. Dies bewirkt, dass der Dateiladecode nur beim ersten Aufruf der UDF und nicht bei nachfolgenden Aufrufen ausgeführt wird.

Im folgenden Beispiel wird ein Objekt (UDFCode) mit einer Funktion definiert, die als UDF (readFileFunc) verwendet wird. Die Funktion liest die Datei data/hello.txt, die die Zeichenfolge hello, enthalten soll. Die Funktion stellt diese Zeichenfolge der als Argument übergebenen Zeichenfolge voran.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}

Im nächsten Teil des Beispiels wird die Funktion als anonyme UDF registriert. In dem Beispiel wird die UDF auf der Spalte NAME in einem DataFrame aufgerufen. Im Beispiel wird davon ausgegangen, dass die Datei data/hello.txt in der JAR-Datei myJar.jar gepackt ist.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()