Appel de fonctions et des procédures stockées dans Snowpark Scala

Pour traiter les données d’un DataFrame, vous pouvez appeler des fonctions SQL définies par le système, des fonctions définies par l’utilisateur et des procédures stockées. Cette rubrique explique comment les appeler dans Snowpark.

Dans ce chapitre :

Appel de fonctions définies par le système

Si vous devez appeler des fonctions SQL définies par le système, utilisez les fonctions équivalentes dans l’objet com.snowflake.snowpark.functions object.

L’exemple suivant appelle la fonction upper de l’objet functions (l’équivalent de la fonction UPPER définie par le système) pour renvoyer les valeurs de la colonne nom avec les lettres en majuscules :

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

Si une fonction SQL définie par le système n’est pas disponible dans l’objet fonctions, vous pouvez utiliser l’une des approches suivantes :

  • Utilisez la fonction callBuiltin pour appeler la fonction définie par le système.

  • Utilisez la fonction builtin pour créer un objet fonction que vous pouvez utiliser pour appeler la fonction définie par le système.

callBuiltin et builtin sont définis dans l’objet com.snowflake.snowpark.functions.

Pour callBuiltin, transmettez le nom de la fonction définie par le système comme premier argument. Si vous devez transmettre les valeurs des colonnes à la fonction définie par le système, définissez et transmettez les objets colonne comme arguments supplémentaires dans la fonction callBuiltin.

L’exemple suivant appelle la fonction RADIANS définie par le système, en lui transmettant la valeur de la colonne col1 :

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

La fonction callBuiltin renvoie un Column, que vous pouvez transmettre aux méthodes de transformation DataFrame (par exemple, filtre, sélection, etc.).

Pour builtin, transmettez le nom de la fonction définie par le système, et utilisez l’objet fonction retourné pour appeler la fonction définie par le système. Par exemple :

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

Appel de fonctions scalaires définies par l’utilisateur (UDFs)

La méthode pour appeler une UDF dépend de la façon dont l’UDF a été créée :

  • Pour appeler une UDF anonyme, appelez la méthode apply de l’objet UserDefinedFunction qui a été retourné lorsque vous avez créé l’UDF.

    Les arguments que vous transmettez à une UDF doivent être des objets de colonne. Si vous devez transmettre un littéral, utilisez lit(), comme expliqué dans Utilisation de littéraux comme objets de colonne.

  • Pour appeler des UDFs que vous avez enregistrées par le nom et des UDFs que vous avez créées en exécutant CREATE FUNCTION, utilisez la fonction callUDF dans l’objet com.snowflake.snowpark.functions.

    Transmettez le nom de l’UDF comme premier argument et tous les paramètres de l’UDF comme arguments supplémentaires.

L’appel d’une UDF renvoie un objet Column contenant la valeur de retour de l’UDF.

L’exemple suivant appelle la fonction UDF myFunction, en lui transmettant les valeurs des colonnes col1 et col2. L’exemple transmet la valeur de retour de myFunction à la méthode select de DataFrame.

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

Appel de fonctions de table (fonctions système et UDTFs)

Pour appeler une fonction de table ou une fonction de table définie par l’utilisateur (UDTF) :

  1. Construisez un objet TableFunction en transmettant le nom de la fonction de table.

    Si vous créez une UDTF dans Snowpark, vous pouvez simplement utiliser l’objet TableFunction renvoyé par la méthode UDTFRegistration.registerTemporary ou UDTFRegistration.registerPermanent. Voir Création de fonctions de table définies par l’utilisateur (UDTFs).

  2. Appelez session.tableFunction, en transmettant l’objet TableFunction et un Map de noms et valeurs d’arguments d’entrée.

table?Function renvoie un DataFrame qui contient la sortie de la fonction de table.

Par exemple, supposons que vous ayez exécuté la commande suivante pour créer une UDTF SQL :

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

Le code suivant appelle cette UDTF et crée un DataFrame pour la sortie de l’UDTF. L’exemple affiche les 10 premières lignes de la sortie dans la console.

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

Si vous devez joindre la sortie d’une fonction de table à un DataFrame, appelez la méthode DataFrame.join qui transmet un TableFunction.

Appel de procédures stockées

Vous pouvez exécuter une procédure soit du côté du serveur (dans l’environnement Snowflake), soit localement. Gardez à l’esprit que les deux environnements étant différents, les conditions et les résultats de l’exécution de la procédure peuvent différer entre eux.

Vous pouvez appeler une procédure avec l’API Snowpark de l’une ou l’autre des manières suivantes :

  • Exécutez une fonction localement pour la tester et la déboguer à l’aide de la méthode SProcRegistration.runLocally.

  • Exécutez une procédure dans l’environnement Snowflake côté serveur à l’aide de la méthode Session.storedProcedure. Il peut s’agir d’une procédure limitée à la session en cours ou d’une procédure permanente stockée sur Snowflake.

Vous pouvez également appeler une procédure stockée permanente que vous créez avec l’API Snowpark à partir d’une feuille de calcul Snowflake. Pour plus d’informations, reportez-vous à Appel d’une procédure stockée.

Pour en savoir plus sur la création de procédures avec Snowpark API, consultez Création de procédures stockées pour DataFrames dans Scala.

Exécution locale de la logique d’une procédure

Vous pouvez exécuter la fonction lambda de votre procédure dans votre environnement local à l’aide de la méthode SProcRegistration.runLocally. La méthode exécute la fonction et renvoie son résultat sous la forme du type renvoyé par la fonction.

Par exemple, vous pouvez appeler localement (côté client) une fonction lambda que vous avez l’intention d’utiliser dans une procédure avant d’enregistrer une procédure à partir de celle-ci sur Snowflake. Vous commencez par assigner le code lambda comme valeur à une variable. Vous transmettez cette variable à la méthode SProcRegistration.runLocally pour l’exécuter côté client. Vous pouvez également utiliser la variable pour représenter la fonction lors de l’enregistrement de la procédure.

Le code de l’exemple suivant affecte la fonction à la variable func. Il teste ensuite la fonction localement en transmettant la variable à la méthode SProcRegistration.runLocally avec la valeur de l’argument de la fonction. La variable est également utilisée pour enregistrer la procédure.

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

Exécution d’une procédure sur le serveur

Pour exécuter une procédure dans l’environnement Snowflake sur le serveur, utilisez la méthode Session.storedProcedure. Cette méthode renvoie un objet DataFrame.

Par exemple, vous pouvez exécuter :

Le code de l’exemple suivant crée une procédure temporaire conçue pour s’exécuter sur le serveur, mais qui ne dure que le temps de la session Snowpark en cours. Il exécute ensuite la procédure en utilisant à la fois le nom de la procédure et la variable StoredProcedure qui la représente.

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy