Création de fonctions définies par l’utilisateur (UDFs) pour DataFrames dans Java

L’API de Snowpark fournit des méthodes que vous pouvez utiliser pour créer une fonction définie par l’utilisateur à partir d’une expression lambda dans Java. Cette rubrique explique comment créer ces types de fonctions.

Dans ce chapitre :

Introduction

Vous pouvez appeler des APIs Snowpark pour créer des fonctions définies par l’utilisateur (UDFs) pour vos expressions lambdas dans Java, et vous pouvez appeler ces UDFs pour traiter les données dans votre DataFrame.

Lorsque vous utilisez l’API de Snowpark pour créer une UDF, la bibliothèque Snowpark sérialise et charge le code de votre UDF dans une zone de préparation. Lorsque vous appelez l’UDF, la bibliothèque Snowpark exécute votre fonction sur le serveur, où se trouvent les données. Par conséquent, les données ne doivent pas être transférées au client pour que la fonction puisse les traiter.

Dans votre code personnalisé, vous pouvez également appeler du code qui est contenu dans des fichiers JAR (par exemple, des classes Java pour une bibliothèque tierce).

Vous pouvez créer une UDF pour votre code personnalisé de deux façons :

  • Vous pouvez créer une UDF anonyme et affecter la fonction à une variable. Tant que cette variable est dans le scope, vous pouvez l’utiliser pour appeler l’UDF.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • Vous pouvez créer une UDF nommée et appeler l’UDF par son nom. Vous pouvez l’utiliser si, par exemple, vous devez appeler une UDF par son nom ou utiliser l’UDF dans une session ultérieure.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

Le reste de ce chapitre explique comment créer des UDFs.

Note

Si vous avez défini une UDF en exécutant la commande CREATE FUNCTION vous pouvez appeler cette UDF dans Snowpark.

Pour plus de détails, voir Appel de fonctions scalaires définies par l’utilisateur (UDFs).

Types de données pris en charge pour les arguments et les valeurs de retour

Afin de créer une UDF pour une lambda Java, vous devez utiliser les types de données pris en charge énumérés ci-dessous pour les arguments et la valeur de retour de votre méthode :

Type de données SQL

Type de données Java

Notes

NUMBER

Les types suivants sont pris en charge :

  • Integer

  • Long

  • java.math.BigDecimal ou java.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] ou Variant[]

OBJECT

Map<Chaîne, Chaîne> ou Map<Chaîne, Variante>

GEOGRAPHY

com.snowflake.snowpark_java.types.Geography

Spécifier des dépendances pour une UDF

Afin de définir une UDF par le biais de l” API de Snowpark, vous devez appeler Session.addDependency() pour tous les fichiers qui contiennent des classes et des ressources dont dépend votre UDF (par exemple, les fichiers JAR, les fichiers de ressources, etc.) (Pour plus de détails sur la lecture des ressources d’une UDF, voir Lecture de fichiers à partir d’une UDF).

La bibliothèque Snowpark télécharge ces fichiers dans une zone de préparation interne et ajoute les fichiers au chemin de classe lors de l’exécution de votre UDF.

Astuce

Si vous ne voulez pas que la bibliothèque télécharge le fichier à chaque fois que vous exécutez votre application, chargez le fichier dans une zone de préparation. Lorsque vous appelez addDependency, transmettez le chemin d’accès au fichier dans la zone de préparation.

L’exemple suivant montre comment ajouter un fichier JAR dans une zone de préparation en tant que dépendance :

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

Les exemples suivants montrent comment ajouter des dépendances pour les fichiers JAR et les fichiers de ressources :

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

Vous ne devriez pas avoir besoin de spécifier les dépendances suivantes :

  • Vos bibliothèques d’exécution Java.

    Ces bibliothèques sont déjà disponibles dans l’environnement d’exécution sur le serveur où vos UDFs sont exécutées.

  • Le fichier JAR Snowpark.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger le fichier JAR de Snowpark sur le serveur.

    Pour empêcher la bibliothèque de charger de façon répétée le fichier JAR de Snowpark vers le serveur :

    1. Chargez le fichier JAR Snowpark dans une zone de préparation.

      Par exemple, la commande suivante charge le fichier JAR de Snowpark dans la zone de préparation @mystage. La commande PUT compresse le fichier JAR et nomme le fichier résultant snowpark-1.9.0.jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Appelez addDependency pour ajouter le fichier JAR de Snowpark dans la zone de préparation comme une dépendance.

      Par exemple, pour ajouter le fichier JAR de Snowpark chargé par la commande précédente :

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz");

      Notez que le chemin d’accès spécifié dans le fichier JAR comprend l’extension de nom de fichier .gz, qui a été ajoutée par la commande PUT.

  • Le fichier ou répertoire JAR contenant l’application en cours d’exécution.

    La bibliothèque Snowpark tente automatiquement de détecter et de charger ces dépendances.

    Si la bibliothèque Snowpark n’est pas en mesure de détecter ces dépendances automatiquement, la bibliothèque signale une erreur, et vous devez appeler addDependency pour ajouter ces dépendances manuellement.

Si le chargement des dépendances vers la zone de préparation prend trop de temps, la bibliothèque Snowpark signale une exception d’expiration. Pour configurer la durée maximale pendant laquelle la bibliothèque Snowpark doit patienter, définissez la propriété snowpark_request_timeout_in_seconds lors de la création de la session.

Création d’une UDF anonyme

Pour créer une UDF anonyme, vous pouvez soit :

  • Appelez la méthode statique Functions.udf, en transmettant l’expression lambda et les champs DataTypes (ou les objets construits par les méthodes de cette classe) représentant les types de données des entrées et des sorties.

  • Appelez la méthode registerTemporary de la classe UDFRegistration, en transmettant l’expression lambda et les champs DataTypes (ou les objets construits par les méthodes de cette classe) représentant les types de données des entrées et des sorties.

    Vous pouvez accéder à une instance de la classe UDFRegistration en appelant la méthode udf de l’objet Session.

    Lorsque vous appelez registerTemporary, utilisez une signature de méthode qui ne comporte pas de paramètre name. (Comme vous créez une UDF anonyme, vous ne spécifiez pas de nom pour l’UDF).

Note

Lorsque vous écrivez du code multithread (par exemple, lorsque vous utilisez des collections parallèles), utilisez la méthode registerTemporary pour enregistrer des UDFs, plutôt que d’utiliser la méthode udf. Cela peut éviter les erreurs dans lesquelles l’objet Snowflake Session par défaut ne peut être trouvé.

Ces méthodes renvoient un objet UserDefinedFunction que vous pouvez utiliser pour appeler les UDF. (Voir Appel de fonctions scalaires définies par l’utilisateur (UDFs).)

L’exemple suivant crée une UDF anonyme :

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

L’exemple suivant crée une UDF anonyme qui utilise une classe personnalisée (LanguageDetector, qui détecte la langue utilisée dans le texte). L’exemple appelle l’UDF anonyme pour détecter la langue dans la colonne text_data d’un DataFrame et crée un nouveau DataFrame qui inclut une colonne lang supplémentaire avec la langue utilisée.

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

Création et enregistrement d’une UDF nommée

Si vous voulez appeler une UDF par son nom (par exemple, en utilisant la méthode statique Functions.callUDF) ou si vous devez utiliser une UDF dans des sessions ultérieures, vous pouvez créer et enregistrer une UDF nommée. Pour ce faire, utilisez l’une des méthodes suivantes dans la classe UDFRegistration :

  • registerTemporary, si vous ne prévoyez d’utiliser l’UDF que dans la session en cours

  • registerPermanent, si vous prévoyez d’utiliser l’UDF dans les sessions suivantes

Pour accéder à un objet de la classe UDFRegistration, appelez la méthode udf de l’objet Session.

Lorsque vous appelez la méthode registerTemporary ou registerPermanent, transmettez l’expression lambda et les champs DataTypes (ou les objets construits par les méthodes de cette classe) représentant les types de données des entrées et des sorties.

Par exemple :

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent crée une UDF que vous pouvez utiliser dans la session actuelle et les suivantes. Lorsque vous appelez registerPermanent, vous devez également spécifier un emplacement dans une zone de préparation interne où les fichiers JAR pour l’UDF et ses dépendances seront chargés.

Note

registerPermanent ne prend pas en charge les zones de préparation externes.

Par exemple :

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Utilisation d’objets qui ne sont pas sérialisables

Lorsque vous créez une UDF pour une expression lambda, la bibliothèque Snowpark sérialise la fermeture lambda et l’envoie au serveur pour exécution.

Si un objet capturé par la fermeture lambda n’est pas sérialisable, la bibliothèque Snowpark lève une exception java.io.NotSerializableException.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Si cela se produit, vous devez rendre l’objet sérialisable.

Écriture du code d’initialisation d’une UDF

Si votre UDF nécessite un code d’initialisation ou un contexte, vous pouvez le fournir par le biais de valeurs capturées dans le cadre de la fermeture de l’UDF.

L’exemple suivant utilise une classe distincte pour initialiser le contexte nécessaire aux deux UDFs.

  • La première UDF crée une nouvelle instance de la classe au sein de la Lambda. Ainsi, l’initialisation est effectuée chaque fois que l’UDF est appelée.

  • La seconde UDF capture une instance de la classe générée dans votre programme client. Le contexte généré sur le client est sérialisé et est utilisé par l’UDF. Notez que la classe de contexte doit être sérialisable pour que cette approche fonctionne.

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

Lecture de fichiers à partir d’une UDF

Comme mentionné précédemment, la bibliothèque Snowpark télécharge et exécute les UDFs sur le serveur. Si votre UDF doit lire des données à partir d’un fichier, vous devez vous assurer que le fichier est chargé avec l’UDF.

En outre, si le contenu du fichier reste le même entre les appels à l” UDF, vous pouvez écrire votre code pour charger le fichier une fois lors du premier appel et pas lors des appels suivants. Cela peut améliorer les performances de vos appels UDF.

Pour configurer une UDF pour lire un fichier :

  1. Ajoutez le fichier à un fichier JAR.

    Par exemple, si votre UDF doit utiliser un fichier dans un sous-répertoire data/ (data/hello.txt), exécutez la commande jar pour ajouter ce fichier à un fichier JAR :

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Spécifiez que le fichier JAR est une dépendance, ce qui charge le fichier sur le serveur et ajoute le fichier au classpath. Voir Spécifier des dépendances pour une UDF.

    Par exemple :

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. Dans l’UDF, appelez Class.forName().getResourceAsStream() pour trouver le fichier dans le classpath et lire le fichier.

    Pour éviter d’ajouter une dépendance sur this, vous pouvez utiliser Class.forName("com.snowflake.snowpark_java.DataFrame") (plutôt que getClass()) pour obtenir l’objet Class.

    Par exemple, pour lire le fichier data/hello.txt :

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    Dans cet exemple, le nom de la ressource commence par un /, ce qui indique qu’il s’agit du chemin complet du fichier dans le fichier JAR. (Dans ce cas, l’emplacement du fichier n’est pas relatif au paquet de la classe).

Note

Si vous ne vous attendez pas à ce que le contenu du fichier change entre les appels UDF, lisez le fichier dans un champ statique de votre classe, et ne lisez le fichier que si le champ n’est pas défini.

L’exemple suivant définit un objet (UDFCode) avec une fonction qui sera utilisée comme une UDF (readFileFunc). La fonction lit le fichier data/hello.txt, qui doit contenir la chaîne hello,. La fonction ajoute cette chaîne à la chaîne transmise en argument.

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

La partie suivante de l’exemple enregistre la fonction comme une UDF anonyme. L’exemple appelle l’UDF de la colonne NAME dans un DataFrame. L’exemple suppose que le fichier data/hello.txt est contenu dans un paquet dans le fichier JAR myJar.jar.

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

Création de fonctions de table définies par l’utilisateur (UDTFs)

Pour créer et enregistrer une UDTF dans Snowpark, vous devez :

Les sections suivantes décrivent ces étapes plus en détail.

Pour plus d’informations sur l’appel d’une UDTF, voir Appel d’une UDTF.

Définition de la classe UDTF

Définissez une classe qui implémente l’une des interfaces JavaUDTFn (par exemple JavaUDTF0, JavaUDTF1, etc.) du paquet com.snowflake.snowpark.udtf, où n spécifie le nombre d’arguments d’entrée pour votre UDTF. Par exemple, si votre UDTF transmet deux arguments d’entrée, implémentez l’interface JavaUDTF2.

Dans votre classe, implémentez les méthodes suivantes :

  • outputSchema(), qui renvoie un objet types.StructType décrivant les noms et les types des champs dans les lignes retournées (le « schéma » de la sortie).

  • process(), qui est appelé une fois pour chaque ligne de la partition d’entrée (voir la note ci-dessous).

  • inputSchema(), qui renvoie un objet types.StructType décrivant les types des paramètres d’entrée.

    Si votre méthode process() transmet des arguments Map , vous devez implémenter la méthode inputSchema(). Sinon, l’implémentation de cette méthode est facultative.

  • endPartition(), qui est appelé une fois pour chaque partition après que toutes les lignes ont été transmises dans process().

Lorsqu’une UDTF est appelée, les lignes sont regroupées en partitions avant d’être transmises à l’UDTF :

  • Si l’instruction qui appelle l’UDTF spécifie la clause PARTITION (partitions explicites), cette clause détermine comment les lignes sont partitionnées.

  • Si l’instruction ne spécifie pas la clause PARTITION (partitions implicites), Snowflake détermine la meilleure façon de partitionner les lignes.

Pour une explication des partitions, voir Fonctions et partitions des tables.

Pour un exemple de classe UDTF, voir Exemple de classe UDTF.

Implémentation de la méthode outputSchema()

Remplacez la méthode outputSchema() pour définir les noms et les types de données des champs (le « schéma de sortie ») des lignes renvoyées par les méthodes process() et endPartition().

public StructType outputSchema()
Copy

Dans cette méthode, construisez et renvoyez un objet StructType qui contient des objets StructField représentant le type de données Snowflake de chaque champ dans une ligne renvoyée. Snowflake prend en charge les objets de type suivants pour le schéma de sortie d’une UDTF :

Type de données SQL

Type Java

Type com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<Chaîne , chaîne>

MapType(StringType, StringType)

OBJECT

java.util.Map<Chaîne, variante>

MapType(StringType, VariantType)

Par exemple, si votre UDTF renvoie une ligne avec un champ d’un entier seul :

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

Implémentation de la méthode process()

Dans votre classe d’UDTF, implémentez la méthode process() :

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

n est le nombre d’arguments transmis à votre UDTF.

Le nombre d’arguments dans la signature correspond à l’interface que vous avez implémentée. Par exemple, si votre UDTF transmet deux arguments d’entrée et que vous implémentez l’interface JavaUDTF2 , la méthode process() a cette signature :

Stream<Row> process(A0 arg0, A1 arg1)
Copy

Cette méthode est appelée une fois pour chaque ligne de la partition d’entrée.

Choix des types d’arguments

Pour le type de chaque argument de la méthode process() utilisez le type Java qui correspond au type de données Snowflake de l’argument transmis à l’UDTF.

Snowflake prend en charge les types de données suivants pour les arguments d’une UDTF :

Type de données SQL

Type de données Java

Notes

NUMBER

Les types suivants sont pris en charge :

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] ou Variant[]

OBJECT

Map<Chaîne, Chaîne> ou Map<Chaîne, Variante>

Note

Si vous transmettez des arguments java.util.Map vous devez implémenter la méthode inputSchema pour décrire les types de ces arguments. Voir Implémentation de la méthode inputSchema().

Renvoi de lignes

Dans la méthode process() , construisez et renvoyez un java.util.stream.Stream d’objets Row qui contiennent les données à renvoyer par l’UDTF pour les valeurs d’entrée données. Les champs de la ligne doivent utiliser les types que vous avez spécifiés dans la méthode outputSchema. (Voir Implémentation de la méthode outputSchema().)

Par exemple, si votre UDTF génère des lignes, construisez et renvoyez un Iterable d’objets Row pour les lignes générées :

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

Implémentation de la méthode inputSchema()

Si la méthode process() transmet un argument java.util.Map vous devez implémenter la méthode inputSchema() pour décrire les types des arguments d’entrée.

Note

Si la méthode process() ne transmet pas d’arguments Map il n’est pas nécessaire d’implémenter la méthode inputSchema().

Dans cette méthode, construisez et renvoyez un objet StructType qui contient des objets StructField représentant le type de données Snowflake de chaque argument transmis à la méthode process(). Snowflake prend en charge les objets de types suivants pour le schéma d’entrée d’une UDTF :

Type de données SQL

Type Java

Type com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<Chaîne , chaîne>

MapType(StringType, StringType)

OBJECT

java.util.Map<Chaîne, variante>

MapType(StringType, VariantType)

Par exemple, supposons que votre méthode process() transmette un argument Map<Chaîne, Chaîne> et un argument Map<Chaîne, Variante> :

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

Vous devez implémenter la méthode inputSchema() pour renvoyer un objet StructType qui décrit les types de ces arguments d’entrée :

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

Implémentation de la méthode endPartition()

Implémentez la méthode endPartition et ajoutez le code qui doit être exécuté après que toutes les lignes de la partition d’entrée ont été transmises dans la méthode process. Cette méthode endPartition est appelée une fois pour chaque partition d’entrée.

public Stream<Row> endPartition()
Copy

Vous pouvez utiliser cette méthode si vous devez effectuer un travail après que toutes les lignes de la partition ont été traitées. Par exemple, vous pouvez :

  • Renvoyer des lignes basées sur les informations d’état que vous saisissez dans chaque appel de méthode process.

  • Renvoyer les lignes qui ne sont pas liées à une ligne d’entrée spécifique.

  • Renvoyer des lignes qui résument les lignes de sortie qui ont été générées par la méthode process.

Les champs des lignes que vous renvoyez doivent correspondre aux types que vous avez spécifiés dans la méthode outputSchema. (Voir Implémentation de la méthode outputSchema().)

Si vous n’avez pas besoin de renvoyer de lignes supplémentaires à la fin de chaque partition, renvoyez un Stream vide. Par exemple :

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

Note

Bien que Snowflake prenne en charge les grandes partitions avec des délais d’expiration définis pour les traiter avec succès, les partitions particulièrement grandes peuvent entraîner des expirations (par exemple lorsque endPartition prend trop de temps à se terminer). Veuillez contacter le support Snowflake si vous avez besoin d’ajuster le seuil d’expiration pour des scénarios d’utilisation spécifiques.

Exemple de classe UDTF

Voici un exemple de classe UDTF qui génère une plage de lignes.

  • Parce que l’UDTF se transmet dans deux arguments, la classe implémente JavaUDTF2.

  • Les arguments start et count spécifient le numéro de départ de la ligne et le nombre de lignes à générer.

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

Enregistrement de l’UDTF

Ensuite, créez une instance de la nouvelle classe, et enregistrez la classe en appelant l’une des méthodes UDTFRegistration. Vous pouvez enregistrer une UDTF temporaire ou permanente.

Enregistrement d’une UDTF temporaire

Pour enregistrer une UDTF temporaire, appelez UDTFRegistration.registerTemporary :

  • Si vous n’avez pas besoin d’appeler l’UDTF par son nom, vous pouvez enregistrer une UDTF anonyme en lui transmettant une instance de la classe :

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • Si vous devez appeler l’UDTF par son nom, transmettez également le nom de l’UDTF :

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

Enregistrement d’une UDTF permanente

Si vous devez utiliser l’UDTF lors de sessions ultérieures, appelez UDTFRegistration.registerPermanent pour enregistrer une UDTF permanente.

Lors de l’enregistrement d’une UDTF permanente, vous devez spécifier une zone de préparation où la méthode d’enregistrement téléchargera les fichiers JAR pour l’UDTF et ses dépendances. Par exemple :

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Appel d’une UDTF

Après avoir enregistré l’UDTF, vous pouvez appeler l’UDTF en transmettant l’objet TableFunction retourné à la méthode tableFunction de l’objet Session :

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

Pour appeler une UDTF par son nom, il faut construire un objet TableFunction avec ce nom, et le transmettre dans la méthode tableFunction :

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Vous pouvez également appeler une UDTF par une instruction SELECT directement :

session.sql("select * from table(myUdtf(10, 5))");
Copy