Appel de fonctions et de procédures stockées dans Snowpark Java

Pour traiter les données d’un DataFrame, vous pouvez appeler des fonctions SQL définies par le système, des fonctions définies par l’utilisateur et des procédures stockées. Cette rubrique explique comment les appeler dans Snowpark.

Dans ce chapitre :

Appel de fonctions définies par le système

Si vous devez appeler des fonctions SQL définies par le système, utilisez les méthodes statiques équivalentes de la classe Functions.

L’exemple suivant appelle la upper méthode statique de la classe Functions (l’équivalent de la fonction UPPER définie par le système) pour renvoyer les valeurs de la colonne nom avec les lettres en majuscules :

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

Si une fonction SQL définie par le système n’est pas disponible dans la classe Functions, vous pouvez utiliser la méthode statique Functions.callUDF pour appeler la fonction définie par le système.

Pour callUDF, transmettez le nom de la fonction définie par le système comme premier argument. Si vous devez transmettre les valeurs des colonnes à la fonction définie par le système, définissez et transmettez les objets colonne comme arguments supplémentaires dans la méthode callUDF.

L’exemple suivant appelle la fonction RADIANS définie par le système, en lui transmettant la valeur de la colonne degrees :

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

La méthode callUDF renvoie un Column, que vous pouvez transmettre aux méthodes de transformation DataFrame (par exemple, filtre, sélection, etc.).

Appel de fonctions scalaires définies par l’utilisateur (UDFs)

La méthode pour appeler une UDF dépend de la façon dont l’UDF a été créée :

  • Pour appeler une UDF anonyme, appelez la méthode apply de l’objet UserDefinedFunction qui a été retourné lorsque vous avez créé l’UDF.

    Les arguments que vous transmettez à une UDF doivent être des objets de colonne. Si vous devez transmettre un littéral, utilisez Functions.lit(), comme expliqué dans Utilisation de littéraux comme objets de colonne.

  • Pour appeler des UDFs que vous avez enregistrées par le nom et des UDFs que vous avez créées en exécutant CREATE FUNCTION, utilisez la méthode statique Functions.callUDF.

    Transmettez le nom de l’UDF comme premier argument et tous les paramètres de l’UDF comme arguments supplémentaires.

L’appel d’une UDF renvoie un objet Column contenant la valeur de retour de l’UDF.

L’exemple suivant appelle la fonction UDF doubleUdf, en lui transmettant la valeur des colonnes quantity. L’exemple transmet la valeur de retour de doubleUdf à la méthode select de DataFrame.

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Appel de fonctions de table (fonctions système et UDTFs)

Pour appeler une fonction de table ou une fonction de table définie par l’utilisateur (UDTF) :

  1. Construisez un objet TableFunction en transmettant le nom de la fonction de table.

  2. Appelez la méthode tableFunction de l’objet Session, en lui transmettant l’objet TableFunction et un Map de noms et valeurs d’arguments d’entrée.

table?Function renvoie un DataFrame qui contient la sortie de la fonction de table.

Par exemple, supposons que vous ayez exécuté la commande suivante pour créer une UDTF SQL :

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

Le code suivant appelle cette UDTF et crée un DataFrame pour la sortie de l’UDTF. L’exemple affiche les 10 premières lignes de la sortie dans la console.

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

Si vous devez joindre la sortie d’une fonction de table à un DataFrame, appelez la méthode join qui se transmet dans une TableFunction.

Appel de procédures stockées

Vous pouvez exécuter une procédure soit du côté du serveur (dans l’environnement Snowflake), soit localement. Gardez à l’esprit que les deux environnements étant différents, les conditions et les résultats de l’exécution de la procédure peuvent différer entre eux.

Vous pouvez appeler une procédure avec l’API Snowpark de l’une ou l’autre des manières suivantes :

  • Exécutez une fonction localement pour la tester et la déboguer à l’aide de la méthode SProcRegistration.runLocally.

  • Exécutez une procédure dans l’environnement Snowflake côté serveur à l’aide de l’une des méthodes Session.storedProcedure. Il peut s’agir d’une procédure limitée à la session en cours ou d’une procédure permanente stockée sur Snowflake.

Vous pouvez également appeler une procédure stockée permanente que vous créez avec l’API Snowpark à partir du code SQL. Pour plus d’informations, reportez-vous à Appel d’une procédure stockée.

Pour en savoir plus sur la création de procédures avec Snowpark API, consultez Création de procédures stockées pour DataFrames dans Java.

Exécution locale de la logique d’une procédure

Vous pouvez exécuter la fonction lambda de votre procédure dans votre environnement local à l’aide de la méthode SProcRegistration.runLocally. La méthode exécute la fonction et renvoie son résultat sous la forme du type renvoyé par la fonction.

Par exemple, vous pouvez appeler une fonction lambda que vous avez l’intention d’utiliser dans une procédure avant d’enregistrer une procédure à partir de celle-ci sur Snowflake. Vous commencez par assigner le code lambda comme valeur à une variable dont le type est l’une des interfaces com.snowflake.snowpark_java.sproc.JavaSProc. En utilisant cette variable, vous pouvez tester l’appel de la fonction avec la méthode SProcRegistration.runLocally. Vous pouvez également utiliser la variable pour représenter la fonction lors de l’enregistrement de la procédure.

Le code de l’exemple suivant initialise une variable JavaSProc à partir de la fonction lambda qui sera la logique de la procédure. Il teste ensuite la fonction en passant la variable à la méthode SProcRegistration.runLocally avec l’argument de la fonction. La variable est également utilisée pour enregistrer la fonction.

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

Exécution d’une procédure sur le serveur

Pour exécuter une procédure dans l’environnement Snowflake sur le serveur, utilisez la méthode Session.storedProcedure. Cette méthode renvoie un objet DataFrame.

Par exemple, vous pouvez exécuter :

Le code de l’exemple suivant crée une procédure temporaire conçue pour s’exécuter sur le serveur, mais qui ne dure que le temps de la session Snowpark en cours. Il exécute ensuite la procédure en utilisant à la fois le nom de la procédure et la variable com.snowflake.snowpark_java.StoredProcedure qui la représente.

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy