Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Scala

L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une lambda ou d’une fonction Scala. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Introduction

Vous pouvez appeler des APIs Snowpark pour créer des fonctions définies par l’utilisateur (UDFs) pour vos lambdas et fonctions Scala, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.

Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark sérialise et charge le code de votre UDF dans une zone de préparation interne. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.

Dans votre code personnalisé, vous pouvez également appeler du code qui est contenu dans des fichiers JAR (par exemple, des classes Java pour une bibliothèque tierce).

Vous pouvez créer une UDF pour votre code personnalisé de deux façons :

  • Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

Les sections suivantes fournissent des informations importantes sur la création d’UDFs dans Snowpark :

Le reste de ce chapitre explique comment créer des UDFs.

Note

Si vous avez défini une UDF en exécutant la commande CREATE FUNCTION vous pouvez appeler cette UDF dans Snowpark.

Pour plus de détails, voir Appel de fonctions scalaires définies par l’utilisateur (UDFs).

Types de données pris en charge pour les arguments et les valeurs de retour

Afin de créer une UDF pour une lambda ou une fonction Scala, vous devez utiliser les types de données pris en charge énumérés ci-dessous pour les arguments et la valeur de retour de votre fonction ou lambda :

Type de données SQL

Type de données Scala

Remarques

NUMBER

Les types suivants sont pris en charge :

  • Short ou Option[Short]

  • Int ou Option[Int]

  • Long ou Option[Long]

  • java.math.BigDecimal

FLOAT

Float ou Option[Float]

DOUBLE

Double ou Option[Double]

VARCHAR

String ou java.lang.String

BOOLEAN

Boolean ou Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] ou Array[Variant]

OBJECT

Map[String, String] ou Map[String, Variant]

Les cartes mutables des types suivants sont prises en charge :

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

com.snowflake.snowpark.types.Geography

Mise en garde concernant la création d’UDFs dans un objet avec l’App Trait

Scala fournit un App Trait que vous pouvez étendre afin de transformer votre objet Scala en un programme exécutable. L” App Trait fournit une méthode main qui exécute automatiquement tout le code dans le corps de votre définition d’objet. (Le code dans votre définition d’objet devient effectivement la méthode main).

Un effet de l’extension de l” App Trait est que les champs de votre objet ne seront pas initialisés avant l’appel de la méthode main. Si votre objet étend App et que vous définissez une UDF qui utilise un champ d’objet que vous avez initialisé précédemment, la définition de l’UDF téléchargée vers le serveur n’inclura pas la valeur initialisée du champ d’objet.

Par exemple, supposons que vous définissiez et initialisiez un champ nommé myConst dans l’objet et que vous utilisiez ce champ dans une UDF :

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

Lorsque Snowpark sérialise et télécharge la définition de l’UDF vers Snowflake, myConst n’est pas initialisé et se résout en null. Par conséquent, l’appel de l’UDF renvoie null pour myConst.

Pour contourner ce problème, modifiez votre objet afin qu’il n’étende pas l” App Trait et implémentez une méthode main distincte pour votre code :

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

Spécifier des dépendances pour une UDF

Afin de définir une UDF par le biais de l” API de Snowpark, vous devez appeler Session.addDependency() pour tous les fichiers qui contiennent des classes et des ressources dont dépend votre UDF (par exemple, les fichiers JAR, les fichiers de ressources, etc.) (Pour plus de détails sur la lecture des ressources d’une UDF, voir Lecture de fichiers à partir d’une UDF).

La bibliothèque Snowpark télécharge ces fichiers dans une zone de préparation interne et ajoute les fichiers au chemin de classe lors de l’exécution de votre UDF.

Astuce

Si vous ne voulez pas que la bibliothèque télécharge le fichier à chaque fois que vous exécutez votre application, chargez le fichier dans une zone de préparation. Lorsque vous appelez addDependency, transmettez le chemin d’accès au fichier dans la zone de préparation.

Si vous utilisez Scala REPL, vous devez ajouter le répertoire des classes générées par REPL comme dépendance. Par exemple, si vous avez utilisé le script run.sh pour démarrer le REPL, appelez la méthode suivante, qui ajoute le répertoire repl_classes créé par le script :

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

L’exemple suivant montre comment ajouter un fichier JAR dans une zone de préparation en tant que dépendance :

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

Les exemples suivants montrent comment ajouter des dépendances pour les fichiers JAR et les fichiers de ressources :

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

Vous ne devriez pas avoir besoin de spécifier les dépendances suivantes :

  • Vos bibliothèques d’exécution Scala.

    Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.

  • Le fichier JAR Snowpark.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger le fichier JAR de Snowpark sur le serveur.

    Pour empêcher la bibliothèque de charger de façon répétée le fichier JAR de Snowpark vers le serveur :

    1. Chargez le fichier JAR Snowpark dans une zone de préparation.

      Par exemple, la commande suivante charge le fichier JAR de Snowpark dans la zone de préparation @mystage. La commande PUT compresse le fichier JAR et nomme le fichier résultant snowpark-1.9.0.jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Appelez addDependency pour ajouter le fichier JAR de Snowpark dans la zone de préparation comme une dépendance.

      Par exemple, pour ajouter le fichier JAR de Snowpark chargé par la commande précédente :

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz")

      Notez que le chemin d’accès spécifié dans le fichier JAR comprend l’extension de nom de fichier .gz, qui a été ajoutée par la commande PUT.

  • Le fichier ou répertoire JAR contenant l’application en cours d’exécution.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger ces dépendances.

    Si la bibliothèque Snowpark n’est pas en mesure de détecter ces dépendances automatiquement, la bibliothèque signale une erreur, et vous devez appeler addDependency pour ajouter ces dépendances manuellement.

Si le chargement des dépendances vers la zone de préparation prend trop de temps, la bibliothèque Snowpark signale une exception d’expiration. Pour configurer la durée maximale pendant laquelle la bibliothèque Snowpark doit patienter, définissez la propriété snowpark_request_timeout_in_seconds lors de la création de la session.

Création d’une UDF anonyme

Pour créer une UDF anonyme, vous pouvez soit :

  • Appeler la fonction udf dans l’objet com.snowflake.snowpark.functions, et transmettre la définition de la fonction anonyme.

  • Appeler la méthode registerTemporary dans la classe UDFRegistration, et transmettre la définition de la fonction anonyme. Comme vous enregistrez une UDF anonyme, vous devez utiliser les signatures de méthode qui ne comportent pas de paramètre name.

Note

Lorsque vous écrivez du code multithread (par exemple, lorsque vous utilisez des collections parallèles), utilisez la méthode registerTemporary pour enregistrer des UDFs, plutôt que d’utiliser la fonction udf. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session par défaut ne peut être trouvé.

Ces méthodes renvoient un objet UserDefinedFunction que vous pouvez utiliser pour appeler les UDF. (Voir Appel de fonctions scalaires définies par l’utilisateur (UDFs).)

L’exemple suivant crée une UDF anonyme :

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

Note

Si vous créez une UDF dans un carnet Jupyter, vous devez configurer le carnet pour qu’il fonctionne avec Snowpark (voir Configuration d’un carnet Jupyter pour Snowpark Scala) et suivre les directives pour écrire des UDFs dans un carnet (voir Création d’UDFs dans des carnets Jupyter).

L’exemple suivant crée une UDF anonyme qui transmet un Array de valeurs String et ajoute la chaîne x à chaque valeur :

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

L’exemple suivant crée une UDF anonyme qui utilise une classe personnalisée (LanguageDetector, qui détecte la langue utilisée dans le texte). L’exemple appelle l’UDF anonyme pour détecter la langue dans la colonne text_data d’un DataFrame et crée un nouveau DataFrame qui inclut une colonne lang supplémentaire avec la langue utilisée.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

Création et enregistrement d’une UDF nommée

Si vous voulez appeler une UDF par son nom (par exemple, en utilisant la fonction callUDF dans l’objet functions) ou si vous devez utiliser une UDF dans des sessions ultérieures, vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, utilisez l’une des méthodes suivantes dans la classe UDFRegistration :

  • registerTemporary, si vous ne prévoyez d’utiliser l’UDF que dans la session en cours

  • registerPermanent, si vous prévoyez d’utiliser l’UDF dans les sessions suivantes

Pour accéder à un objet de la classe UDFRegistration, appelez la méthode udf de la classe Session.

registerTemporary crée une UDF temporaire que vous pouvez utiliser dans la session en cours.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent crée une UDF que vous pouvez utiliser dans la session actuelle et les suivantes. Lorsque vous appelez registerPermanent, vous devez également spécifier un emplacement dans une zone de préparation interne où les fichiers JAR pour l’UDF et ses dépendances seront chargés.

Note

registerPermanent ne prend pas en charge les zones de préparation externes.

Par exemple :

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

Note

Si vous créez une UDF dans un carnet Jupyter, vous devez configurer le carnet pour qu’il fonctionne avec Snowpark (voir Configuration d’un carnet Jupyter pour Snowpark Scala) et suivre les directives pour écrire des UDFs dans un carnet (voir Création d’UDFs dans des carnets Jupyter).

Création d’UDFs dans des carnets Jupyter

Si vous créez des UDFs dans un carnet Jupyter, vous devez suivre ces étapes supplémentaires :

Rédaction de l’implémentation d’une UDF

Définissez l’implémentation de votre fonction dans une classe qui étend Serializable. Par exemple :

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

Accès à une variable définie dans une autre cellule

Si vous devez utiliser une variable définie dans une autre cellule de votre UDF, vous devez transmettre la variable comme argument au constructeur de la classe. Par exemple, supposons que dans la cellule 1, vous ayez défini une variable :

In [1]:
Copy
val prefix = "Hello"
Copy

et vous voulez utiliser cette variable dans une UDF que vous avez définie dans la cellule 2. Dans le constructeur de la classe de l’UDF, ajoutez un argument pour cette variable. Ensuite, lorsque vous appelez le constructeur de la classe pour créer l’UDF, transmettez la variable définie dans la cellule 1 :

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

Utilisation d’objets qui ne sont pas sérialisables

Lorsque vous créez une UDF pour une lambda ou une fonction, la bibliothèque Snowpark sérialise la fermeture lambda et l’envoie au serveur pour exécution.

Si un objet capturé par la fermeture lambda n’est pas sérialisable, la bibliothèque Snowpark lève une exception java.io.NotSerializableException.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Si cela se produit, vous pouvez soit :

  • Rendre l’objet sérialisable, soit

  • Déclarer l’objet comme un lazy val ou utiliser l’annotation @transient pour éviter de sérialiser l’objet.

    Par exemple :

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

Écriture du code d’initialisation d’une UDF

Si votre UDF nécessite un code d’initialisation ou un contexte, vous pouvez le fournir par le biais de valeurs capturées dans le cadre de la fermeture de l’UDF.

L’exemple suivant utilise une classe distincte pour initialiser le contexte nécessaire aux trois UDFs.

  • La première UDF crée une nouvelle instance de la classe au sein de la Lambda. Ainsi, l’initialisation est effectuée chaque fois que l’UDF est appelée.

  • La seconde UDF capture une instance de la classe générée dans votre programme client. Le contexte généré sur le client est sérialisé et est utilisé par l’UDF. Notez que la classe de contexte doit être sérialisable pour que cette approche fonctionne.

  • La troisième UDF capture un lazy val, donc le contexte est instancié de façon « lazy » lors du premier appel de l’UDF et est réutilisé dans les appels suivants. Cette approche fonctionne même lorsque le contexte n’est pas sérialisable. Cependant, il n’y a aucune garantie que les appels de ALL UDF dans un cadre de données utiliseront le même contexte généré de façon « lazy ».

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

Lecture de fichiers à partir d’une UDF

Comme mentionné précédemment, la bibliothèque Snowpark télécharge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.

En outre, si le contenu du fichier reste le même entre les appels à l” UDF, vous pouvez écrire votre code pour charger le fichier une fois lors du premier appel et pas lors des appels suivants. Cela peut améliorer les performances de vos appels UDF.

Pour configurer une UDF pour lire un fichier :

  1. Ajoutez le fichier à un fichier JAR.

    Par exemple, si votre UDF doit utiliser un fichier dans un sous-répertoire data/ (data/hello.txt), exécutez la commande jar pour ajouter ce fichier à un fichier JAR :

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Spécifiez que le fichier JAR est une dépendance, ce qui charge le fichier sur le serveur et ajoute le fichier au classpath. Voir Spécifier des dépendances pour une UDF.

    Par exemple :

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. Dans l’UDF, appelez Class.getResourceAsStream pour trouver le fichier dans le classpath et lire le fichier.

    Pour éviter d’ajouter une dépendance sur this, vous pouvez utiliser classOf[com.snowflake.snowpark.DataFrame] (plutôt que getClass) pour obtenir l’objet Class.

    Par exemple, pour lire le fichier data/hello.txt :

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    Dans cet exemple, le nom de la ressource commence par un /, ce qui indique qu’il s’agit du chemin complet du fichier dans le fichier JAR. (Dans ce cas, l’emplacement du fichier n’est pas relatif au paquet de la classe).

Note

Si vous ne pensez pas que le contenu du fichier va changer entre les appels UDF, lisez le fichier dans un lazy val. Ainsi, le code de chargement du fichier ne s’exécute que lors du premier appel à l” UDF et non lors des appels suivants.

L’exemple suivant définit un objet (UDFCode) avec une fonction qui sera utilisée comme une UDF (readFileFunc). La fonction lit le fichier data/hello.txt, qui doit contenir la chaîne hello,. La fonction ajoute cette chaîne à la chaîne transmise en argument.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

La partie suivante de l’exemple enregistre la fonction comme une UDF anonyme. L’exemple appelle l’UDF de la colonne NAME dans un DataFrame. L’exemple suppose que le fichier data/hello.txt est contenu dans un paquet dans le fichier JAR myJar.jar.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

Création de fonctions de table définies par l’utilisateur (UDTFs)

Pour créer et enregistrer une UDTF dans Snowpark, vous devez :

Les sections suivantes décrivent ces étapes plus en détail.

Pour plus d’informations sur l’appel d’une UDTF, voir Appel d’une UDTF.

Définition de la classe UDTF

Définissez une classe qui hérite d’une des UDTFn classes (par exemple UDTF0, UDTF1, etc.) du paquet com.snowflake.snowpark.udtf, où n spécifie le nombre d’arguments d’entrée pour votre UDTF. Par exemple, si votre UDTF transmet 2 arguments d’entrée, étendez la classe UDTF2.

Dans votre classe, remplacez les méthodes suivantes :

  • outputSchema(), qui renvoie un objet types.StructType décrivant les noms et les types des champs dans les lignes retournées (le « schéma » de la sortie).

  • process(), qui est appelé une fois pour chaque ligne de la partition d’entrée (voir la note ci-dessous).

  • endPartition(), qui est appelé une fois pour chaque partition après que toutes les lignes ont été transmises dans process().

Lorsqu’une UDTF est appelée, les lignes sont regroupées en partitions avant d’être transmises à l’UDTF :

  • Si l’instruction qui appelle l’UDTF spécifie la clause PARTITION (partitions explicites), cette clause détermine comment les lignes sont partitionnées.

  • Si l’instruction ne spécifie pas la clause PARTITION (partitions implicites), Snowflake détermine la meilleure façon de partitionner les lignes.

Pour une explication des partitions, voir Fonctions et partitions des tables.

Pour un exemple de classe UDTF, voir Exemple de classe UDTF.

Remplacement de la méthode outputSchema()

Remplacez la méthode outputSchema() pour définir les noms et les types de données des champs (le « schéma de sortie ») des lignes renvoyées par les méthodes process() et endPartition().

def outputSchema(): StructType
Copy

Dans cette méthode, construisez et renvoyez un objet StructType qui utilise un Array d’objets StructField pour spécifier le type de données Snowflake de chaque champ dans une ligne renvoyée. Snowflake prend en charge les objets de type suivants pour le schéma de sortie d’une UDTF :

Type de données SQL

Type Scala

Type com.snowflake.snowpark.types

NUMBER

Short ou Option[Short]

ShortType

NUMBER

Int ou Option[Int]

IntType

NUMBER

Long ou Option[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

Float ou Option[Float]

FloatType

DOUBLE

Double ou Option[Double]

DoubleType

VARCHAR

String ou java.lang.String

StringType

BOOLEAN

Boolean ou Option[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

com.snowflake.snowpark.types.Variant

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

Par exemple, si votre UDTF renvoie une ligne avec un champ d’un entier seul :

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

Remplacer la méthode process()

Dans votre classe UDTF, remplacez la méthode process() :

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

n est le nombre d’arguments transmis à votre UDTF.

Le nombre d’arguments dans la signature correspond à la classe que vous avez étendue. Par exemple, si votre UDTF transmet 2 arguments d’entrée et que vous étendez la classe UDTF2, la méthode process() a cette signature :

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

Cette méthode est appelée une fois pour chaque ligne de la partition d’entrée.

Choix des types d’arguments

Pour le type de chaque argument de la méthode process(), utilisez le type Scala qui correspond au type de données Snowflake de l’argument transmis à l’UDTF.

Snowflake prend en charge les types de données suivants pour les arguments d’une UDTF :

Type de données SQL

Type de données Scala

Remarques

NUMBER

Les types suivants sont pris en charge :

  • Short ou Option[Short]

  • Int ou Option[Int]

  • Long ou Option[Long]

  • java.math.BigDecimal

FLOAT

Float ou Option[Float]

DOUBLE

Double ou Option[Double]

VARCHAR

String ou java.lang.String

BOOLEAN

Boolean ou Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] ou Array[Variant]

OBJECT

Map[String, String] ou Map[String, Variant]

Les cartes mutables des types suivants sont prises en charge :

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

Renvoi de lignes

Dans la méthode process(), construisez et renvoyez un Iterable d’objets Row qui contiennent les données à renvoyer par l’UDTF pour les valeurs d’entrée données. Les champs de la ligne doivent utiliser les types que vous avez spécifiés dans la méthode outputSchema. (Voir Remplacement de la méthode outputSchema().)

Par exemple, si votre UDTF génère des lignes, construisez et renvoyez un Iterable d’objets Row pour les lignes générées :

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

Remplacement de la méthode endPartition()

Remplacez la méthode endPartition et ajoutez le code qui doit être exécuté après que toutes les lignes de la partition d’entrée ont été transmises dans la méthode process. Cette méthode endPartition est appelée une fois pour chaque partition d’entrée.

def endPartition(): Iterable[Row]
Copy

Vous pouvez utiliser cette méthode si vous devez effectuer un travail après que toutes les lignes de la partition ont été traitées. Par exemple, vous pouvez :

  • Renvoyer des lignes basées sur les informations d’état que vous saisissez dans chaque appel de méthode process.

  • Renvoyer les lignes qui ne sont pas liées à une ligne d’entrée spécifique.

  • Renvoyer des lignes qui résument les lignes de sortie qui ont été générées par la méthode process.

Les champs des lignes que vous renvoyez doivent correspondre aux types que vous avez spécifiés dans la méthode outputSchema. (Voir Remplacement de la méthode outputSchema().)

Si vous n’avez pas besoin de renvoyer de lignes supplémentaires à la fin de chaque partition, renvoyez un Iterable vide d’objets Row. Par exemple :

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

Note

Bien que Snowflake prenne en charge les grandes partitions avec des délais d’expiration définis pour les traiter avec succès, les partitions particulièrement grandes peuvent entraîner des expirations (par exemple lorsque endPartition prend trop de temps à se terminer). Veuillez contacter le support Snowflake si vous avez besoin d’ajuster le seuil d’expiration pour des scénarios d’utilisation spécifiques.

Exemple de classe UDTF

Voici un exemple de classe UDTF qui génère une plage de lignes.

  • Parce que l’UDTF se transmet dans 2 arguments, la classe étend UDTF2.

  • Les arguments start et count spécifient le numéro de départ de la ligne et le nombre de lignes à générer.

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

Enregistrement de l’UDTF

Ensuite, créez une instance de la nouvelle classe, et enregistrez la classe en appelant l’une des méthodes UDTFRegistration. Vous pouvez enregistrer une UDTF temporaire ou permanente.

Enregistrement d’une UDTF temporaire

Pour enregistrer une UDTF temporaire, appelez UDTFRegistration.registerTemporary :

  • Si vous n’avez pas besoin d’appeler l’UDTF par son nom, vous pouvez enregistrer une UDTF anonyme en lui transmettant une instance de la classe :

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • Si vous devez appeler l’UDTF par son nom, transmettez également le nom de l’UDTF :

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

Enregistrement d’une UDTF permanente

Si vous devez utiliser l’UDTF lors de sessions ultérieures, appelez UDTFRegistration.registerPermanent pour enregistrer une UDTF permanente.

Lors de l’enregistrement d’une UDTF permanente, vous devez spécifier une zone de préparation où la méthode d’enregistrement téléchargera les fichiers JAR pour l’UDTF et ses dépendances. Par exemple :

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Appel d’une UDTF

Après avoir enregistré l’UDTF, vous pouvez appeler l’UDTF en transmettant l’objet TableFunction retourné à la méthode tableFunction de l’objet Session :

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

Pour appeler une UDTF par son nom, il faut construire un objet TableFunction avec ce nom, et le transmettre dans la méthode tableFunction :

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Vous pouvez également appeler une UDTF par une instruction SELECT directement :

session.sql("select * from table(myUdtf(10, 5))")
Copy