Créer des fonctions définies par l’utilisateur (UDFs) pour DataFrames

L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une fonction Lambda ou d’une fonction Scala. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Avec Snowpark, vous pouvez créer des fonctions définies par l’utilisateur (UDFs) pour vos Lambdas et fonctions personnalisées, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.

Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark télécharge le code de votre fonction dans une zone de préparation interne. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.

Dans votre code personnalisé, vous pouvez également appeler du code qui est contenu dans des fichiers JAR (par exemple, des classes Java pour une bibliothèque tierce).

Vous pouvez créer une UDF pour votre code personnalisé de deux façons :

  • Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.

  • Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.

Les sections suivantes expliquent comment créer ces UDFs.

Notez que si vous avez défini une UDF en exécutant la commande CREATE FUNCTION, vous pouvez appeler cette UDF dans Snowpark. Pour plus de détails, voir Appel de fonctions définies par l’utilisateur (UDFs).

Mise en garde concernant la création d’UDFs dans un objet avec l’App Trait

Scala fournit un App Trait que vous pouvez étendre afin de transformer votre objet Scala en un programme exécutable. L” App Trait fournit une méthode main qui exécute automatiquement tout le code dans le corps de votre définition d’objet. (Le code dans votre définition d’objet devient effectivement la méthode main).

Un effet de l’extension de l” App Trait est que les champs de votre objet ne seront pas initialisés avant l’appel de la méthode main. Si votre objet étend App et que vous définissez une UDF qui utilise un champ d’objet que vous avez initialisé précédemment, la définition de l’UDF téléchargée vers le serveur n’inclura pas la valeur initialisée du champ d’objet.

Par exemple, supposons que vous définissiez et initialisiez un champ nommé myConst dans l’objet et que vous utilisiez ce champ dans une UDF :

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}

Lorsque Snowpark sérialise et télécharge la définition de l’UDF vers Snowflake, myConst n’est pas initialisé et se résout en null. Par conséquent, l’appel de l’UDF renvoie null pour myConst.

Pour contourner ce problème, modifiez votre objet afin qu’il n’étende pas l” App Trait et implémentez une méthode main distincte pour votre code :

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}

Spécifier des dépendances pour une UDF

Afin de définir une UDF par le biais de l” API de Snowpark, vous devez appeler Session.addDependency() pour tous les fichiers qui contiennent des classes et des ressources dont dépend votre UDF (par exemple, les fichiers JAR, les fichiers de ressources, etc.) (Pour plus de détails sur la lecture des ressources d’une UDF, voir Lecture de fichiers à partir d’une UDF).

La bibliothèque Snowpark télécharge ces fichiers dans une zone de préparation interne et ajoute les fichiers au chemin de classe lors de l’exécution de votre UDF.

Astuce

Si vous ne voulez pas que la bibliothèque télécharge le fichier à chaque fois que vous exécutez votre application, chargez le fichier dans une zone de préparation. Lorsque vous appelez addDependency, transmettez le chemin d’accès au fichier dans la zone de préparation.

Si vous utilisez Scala REPL, vous devez ajouter le répertoire des classes générées par REPL comme dépendance. Par exemple, si vous avez utilisé le script run.sh pour démarrer le REPL, appelez la méthode suivante, qui ajoute le répertoire repl_classes créé par le script :

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")

L’exemple suivant montre comment ajouter un fichier JAR dans une zone de préparation en tant que dépendance :

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")

Les exemples suivants montrent comment ajouter des dépendances pour les fichiers JAR et les fichiers de ressources :

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")

Vous ne devriez pas avoir besoin de spécifier les dépendances suivantes :

  • Vos bibliothèques d’exécution Scala.

    Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.

  • Le fichier JAR Snowpark.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger le fichier JAR de Snowpark sur le serveur.

    Pour empêcher la bibliothèque de charger de façon répétée le fichier JAR de Snowpark vers le serveur :

    1. Chargez le fichier JAR Snowpark dans une zone de préparation.

      Par exemple, la commande suivante télécharge le fichier JAR de Snowpark dans la zone de préparation @mystage. La commande PUT compresse le fichier JAR et nomme le fichier résultant snowpark-0.5.0.jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-0.5.0.jar @mystage
      
    2. Appelez addDependency pour ajouter le fichier JAR de Snowpark dans la zone de préparation comme une dépendance.

      Par exemple, pour ajouter le fichier JAR de Snowpark chargé par la commande précédente :

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-0.5.0.jar.gz")
      

      Notez que le chemin d’accès spécifié dans le fichier JAR comprend l’extension de nom de fichier .gz, qui a été ajoutée par la commande PUT.

  • Le fichier ou répertoire JAR contenant l’application en cours d’exécution.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger ces dépendances.

    Si la bibliothèque Snowpark n’est pas en mesure de détecter ces dépendances automatiquement, la bibliothèque signale une erreur, et vous devez appeler addDependency pour ajouter ces dépendances manuellement.

Création d’une UDF anonyme

Pour créer une UDF anonyme, appelez la fonction udf dans l’objet com.snowflake.snowpark.functions, et transmettez la définition de la fonction anonyme.

L’exemple suivant crée une UDF anonyme qui utilise une classe personnalisée (LanguageDetector, qui détecte la langue utilisée dans le texte). L’exemple appelle l’UDF anonyme pour détecter la langue dans la colonne text_data d’un DataFrame et crée un nouveau DataFrame qui inclut une colonne lang supplémentaire avec la langue utilisée.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))

Note

Si vous créez une UDF dans un carnet Jupyter, vous devez configurer le carnet pour qu’il fonctionne avec Snowpark (voir Configuration d’un carnet Jupyter pour Snowpark) et suivre les directives pour écrire des UDFs dans un carnet (voir Création d’UDFs dans des carnets Jupyter).

Lorsque vous exécutez votre code client, la bibliothèque Snowpark sérialise la fermeture Lambda et l’envoie au serveur pour exécution. Si un objet capturé par la fermeture Lambda n’est pas sérialisable, vous pouvez soit :

  • Rendre l’objet sérialisable, soit

  • Déclarer l’objet comme un lazy val ou utiliser l’annotation @transient pour éviter de sérialiser l’objet.

Par exemple :

// Declare the detector object as lazy.
lazy val detector = new LanguageDetector("en")
// The detector object is not serialized but is instead reconstructed on the server.
val langUdf = udf(...)
val dfEmailsWithLangCol = ...

Création et enregistrement d’une UDF nommée

Si vous voulez appeler une UDF par son nom (par exemple, en utilisant la fonction callUDF dans l’objet functions) ou si vous devez utiliser une UDF dans des sessions ultérieures, vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, utilisez l’une des méthodes suivantes dans la classe UDFRegistration :

  • registerTemporary, si vous ne prévoyez d’utiliser l’UDF que dans la session en cours

  • registerPermanent, si vous prévoyez d’utiliser l’UDF dans les sessions suivantes

Pour accéder à un objet de la classe UDFRegistration, appelez la méthode udf de la classe Session.

registerTemporary crée une UDF temporaire que vous pouvez utiliser dans la session en cours.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

registerPermanent crée une UDF que vous pouvez utiliser dans la session actuelle et les suivantes. Lorsque vous appelez registerPermanent, vous spécifiez un emplacement de zone de préparation où les fichiers JAR pour l’UDF et ses dépendances seront chargés.

// Create and register a permanent named UDF. Specify that the UDF and dependent JAR files should be uploaded to the stage
// named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the num column and returning the result in a new column named doubleNum.
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))

Note

Si vous créez une UDF dans un carnet Jupyter, vous devez configurer le carnet pour qu’il fonctionne avec Snowpark (voir Configuration d’un carnet Jupyter pour Snowpark) et suivre les directives pour écrire des UDFs dans un carnet (voir Création d’UDFs dans des carnets Jupyter).

Création d’UDFs dans des carnets Jupyter

Si vous créez des UDFs dans un carnet Jupyter, vous devez suivre ces étapes supplémentaires :

Rédaction de l’implémentation d’une UDF

Définissez l’implémentation de votre fonction dans une classe qui étend Serializable. Par exemple :

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)

Accès à une variable définie dans une autre cellule

Si vous devez utiliser une variable définie dans une autre cellule de votre UDF, vous devez transmettre la variable comme argument au constructeur de la classe. Par exemple, supposons que dans la cellule 1, vous ayez défini une variable :

In [1]:
val prefix = "Hello"

et vous voulez utiliser cette variable dans une UDF que vous avez définie dans la cellule 2. Dans le constructeur de la classe de l’UDF, ajoutez un argument pour cette variable. Ensuite, lorsque vous appelez le constructeur de la classe pour créer l’UDF, transmettez la variable définie dans la cellule 1 :

In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()

Écriture du code d’initialisation d’une UDF

Si votre UDF nécessite un code d’initialisation ou un contexte, vous pouvez le fournir par le biais de valeurs capturées dans le cadre de la fermeture de l’UDF.

L’exemple suivant utilise une classe distincte pour initialiser le contexte nécessaire aux trois UDFs.

  • La première UDF crée une nouvelle instance de la classe au sein de la Lambda. Ainsi, l’initialisation est effectuée chaque fois que l’UDF est appelée.

  • La seconde UDF capture une instance de la classe générée dans votre programme client. Le contexte généré sur le client est sérialisé et est utilisé par l’UDF. Notez que la classe de contexte doit être sérialisable pour que cette approche fonctionne.

  • La troisième UDF capture un lazy val, donc le contexte est instancié de façon « lazy » lors du premier appel de l’UDF et est réutilisé dans les appels suivants. Cette approche fonctionne même lorsque le contexte n’est pas sérialisable. Cependant, il n’y a aucune garantie que les appels de ALL UDF dans un cadre de données utiliseront le même contexte généré de façon « lazy ».

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}

Lecture de fichiers à partir d’une UDF

Comme mentionné précédemment, la bibliothèque Snowpark télécharge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.

En outre, si le contenu du fichier reste le même entre les appels à l” UDF, vous pouvez écrire votre code pour charger le fichier une fois lors du premier appel et pas lors des appels suivants. Cela peut améliorer les performances de vos appels UDF.

Pour configurer une UDF pour lire un fichier :

  1. Ajoutez le fichier à un fichier JAR.

    Par exemple, si votre UDF doit utiliser un fichier dans un sous-répertoire data/ (data/hello.txt), exécutez la commande jar pour ajouter ce fichier à un fichier JAR :

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
  2. Spécifiez que le fichier JAR est une dépendance, ce qui charge le fichier sur le serveur et ajoute le fichier au classpath. Voir Spécifier des dépendances pour une UDF.

    Par exemple :

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
  3. Dans l’UDF, appelez Class.getResourceAsStream pour trouver le fichier dans le classpath et lire le fichier.

    Pour éviter d’ajouter une dépendance sur this. vous pouvez utiliser classOf[com.snowflake.snowpark.DataFrame] (plutôt que getClass) pour obtenir l’objet Class,

    Par exemple, pour lire le fichier data/hello.txt :

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    

    Dans cet exemple, le nom de la ressource commence par un /, ce qui indique qu’il s’agit du chemin complet du fichier dans le fichier JAR. (Dans ce cas, l’emplacement du fichier n’est pas relatif au package de la classe).

Note

Si vous ne pensez pas que le contenu du fichier va changer entre les appels UDF, lisez le fichier dans un lazy val. Ainsi, le code de chargement du fichier ne s’exécute que lors du premier appel à l” UDF et non lors des appels suivants.

L’exemple suivant définit un objet (UDFCode) avec une fonction qui sera utilisée comme une UDF (readFileFunc). La fonction lit le fichier data/hello.txt, qui doit contenir la chaîne hello,. La fonction ajoute cette chaîne à la chaîne transmise en argument.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}

La partie suivante de l’exemple enregistre la fonction comme une UDF anonyme. L’exemple appelle l’UDF de la colonne NAME dans un DataFrame. L’exemple suppose que le fichier data/hello.txt est contenu dans un package dans le fichier JAR myJar.jar.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()