Appel de fonctions et des procédures stockées dans Snowpark Scala¶
Pour traiter les données d’un DataFrame, vous pouvez appeler des fonctions SQL définies par le système, des fonctions définies par l’utilisateur et des procédures stockées. Cette rubrique explique comment les appeler dans Snowpark.
Dans ce chapitre :
Appel de fonctions définies par le système¶
Si vous devez appeler des fonctions SQL définies par le système, utilisez les fonctions équivalentes dans l’objet com.snowflake.snowpark.functions object.
L’exemple suivant appelle la fonction upper
de l’objet functions
(l’équivalent de la fonction UPPER définie par le système) pour renvoyer les valeurs de la colonne nom avec les lettres en majuscules :
// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Si une fonction SQL définie par le système n’est pas disponible dans l’objet fonctions, vous pouvez utiliser l’une des approches suivantes :
Utilisez la fonction
callBuiltin
pour appeler la fonction définie par le système.Utilisez la fonction
builtin
pour créer un objet fonction que vous pouvez utiliser pour appeler la fonction définie par le système.
callBuiltin
et builtin
sont définis dans l’objet com.snowflake.snowpark.functions
.
Pour callBuiltin
, transmettez le nom de la fonction définie par le système comme premier argument. Si vous devez transmettre les valeurs des colonnes à la fonction définie par le système, définissez et transmettez les objets colonne comme arguments supplémentaires dans la fonction callBuiltin
.
L’exemple suivant appelle la fonction RADIANS définie par le système, en lui transmettant la valeur de la colonne col1
:
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
La fonction callBuiltin
renvoie un Column
, que vous pouvez transmettre aux méthodes de transformation DataFrame (par exemple, filtre, sélection, etc.).
Pour builtin
, transmettez le nom de la fonction définie par le système, et utilisez l’objet fonction retourné pour appeler la fonction définie par le système. Par exemple :
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Appel de fonctions scalaires définies par l’utilisateur (UDFs)¶
La méthode pour appeler une UDF dépend de la façon dont l’UDF a été créée :
Pour appeler une UDF anonyme, appelez la méthode
apply
de l’objet UserDefinedFunction qui a été retourné lorsque vous avez créé l’UDF.Les arguments que vous transmettez à une UDF doivent être des objets de colonne. Si vous devez transmettre un littéral, utilisez
lit()
, comme expliqué dans Utilisation de littéraux comme objets de colonne.Pour appeler des UDFs que vous avez enregistrées par le nom et des UDFs que vous avez créées en exécutant CREATE FUNCTION, utilisez la fonction
callUDF
dans l’objetcom.snowflake.snowpark.functions
.Transmettez le nom de l’UDF comme premier argument et tous les paramètres de l’UDF comme arguments supplémentaires.
L’appel d’une UDF renvoie un objet Column
contenant la valeur de retour de l’UDF.
L’exemple suivant appelle la fonction UDF myFunction
, en lui transmettant les valeurs des colonnes col1
et col2
. L’exemple transmet la valeur de retour de myFunction
à la méthode select
de DataFrame.
// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
df.select(
callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
).collect()
Appel de fonctions de table (fonctions système et UDTFs)¶
Pour appeler une fonction de table ou une fonction de table définie par l’utilisateur (UDTF) :
Construisez un objet TableFunction en transmettant le nom de la fonction de table.
Si vous créez une UDTF dans Snowpark, vous pouvez simplement utiliser l’objet
TableFunction
renvoyé par la méthodeUDTFRegistration.registerTemporary
ouUDTFRegistration.registerPermanent
. Voir Création de fonctions de table définies par l’utilisateur (UDTFs).Appelez session.tableFunction, en transmettant l’objet
TableFunction
et unMap
de noms et valeurs d’arguments d’entrée.
table?Function
renvoie un DataFrame qui contient la sortie de la fonction de table.
Par exemple, supposons que vous ayez exécuté la commande suivante pour créer une UDTF SQL :
CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
RETURNS TABLE(id INT, name VARCHAR)
AS
$$
SELECT id, name
FROM sample_product_data
WHERE category_id = cat_id
$$
;
Le code suivant appelle cette UDTF et crée un DataFrame pour la sortie de l’UDTF. L’exemple affiche les 10 premières lignes de la sortie dans la console.
val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Si vous devez joindre la sortie d’une fonction de table à un DataFrame, appelez la méthode DataFrame.join qui transmet un TableFunction.
Appel de procédures stockées¶
Vous pouvez exécuter une procédure soit du côté du serveur (dans l’environnement Snowflake), soit localement. Gardez à l’esprit que les deux environnements étant différents, les conditions et les résultats de l’exécution de la procédure peuvent différer entre eux.
Vous pouvez appeler une procédure avec l’API Snowpark de l’une ou l’autre des manières suivantes :
Exécutez une fonction localement pour la tester et la déboguer à l’aide de la méthode
SProcRegistration.runLocally
.Exécutez une procédure dans l’environnement Snowflake côté serveur à l’aide de la méthode
Session.storedProcedure
. Il peut s’agir d’une procédure limitée à la session en cours ou d’une procédure permanente stockée sur Snowflake.
Vous pouvez également appeler une procédure stockée permanente que vous créez avec l’API Snowpark à partir d’une feuille de calcul Snowflake. Pour plus d’informations, reportez-vous à Appel d’une procédure stockée.
Pour en savoir plus sur la création de procédures avec Snowpark API, consultez Création de procédures stockées pour DataFrames dans Scala.
Exécution locale de la logique d’une procédure¶
Vous pouvez exécuter la fonction lambda de votre procédure dans votre environnement local à l’aide de la méthode SProcRegistration.runLocally
. La méthode exécute la fonction et renvoie son résultat sous la forme du type renvoyé par la fonction.
Par exemple, vous pouvez appeler localement (côté client) une fonction lambda que vous avez l’intention d’utiliser dans une procédure avant d’enregistrer une procédure à partir de celle-ci sur Snowflake. Vous commencez par assigner le code lambda comme valeur à une variable. Vous transmettez cette variable à la méthode SProcRegistration.runLocally
pour l’exécuter côté client. Vous pouvez également utiliser la variable pour représenter la fonction lors de l’enregistrement de la procédure.
Le code de l’exemple suivant affecte la fonction à la variable func
. Il teste ensuite la fonction localement en transmettant la variable à la méthode SProcRegistration.runLocally
avec la valeur de l’argument de la fonction. La variable est également utilisée pour enregistrer la procédure.
val session = Session.builder.configFile("my_config.properties").create
// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1
// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Exécution d’une procédure sur le serveur¶
Pour exécuter une procédure dans l’environnement Snowflake sur le serveur, utilisez la méthode Session.storedProcedure
. Cette méthode renvoie un objet DataFrame
.
Par exemple, vous pouvez exécuter :
Une procédure temporaire ou permanente que vous créez à l’aide de Snowpark API.
Une procédure créée à l’aide d’une instruction CREATE PROCEDURE.
Le code de l’exemple suivant crée une procédure temporaire conçue pour s’exécuter sur le serveur, mais qui ne dure que le temps de la session Snowpark en cours. Il exécute ensuite la procédure en utilisant à la fois le nom de la procédure et la variable StoredProcedure
qui la représente.
val session = Session.builder.configFile("my_config.properties").create
val name: String = "add_two"
val tempSP: StoredProcedure =
session.sproc.registerTemporary(
name,
(session: Session, num: Int) => num + 2
)
session.storedProcedure(name, 1).show()
// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();
// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();