Gestion des fonctions et des procédures stockées Snowflake avec Python

Vous pouvez utiliser Python pour gérer les fonctions définies par l’utilisateur (UDFs) et les procédures stockées dans Snowflake. Lorsque vous créez une UDF ou une procédure, vous écrivez sa logique dans l’un des langages de gestion pris en charge, puis vous la créez à l’aide de Snowflake Python APIs. Pour plus d’informations sur les UDFs et les procédures stockées, voir Extension de Snowflake avec des fonctions et des procédures.

Conditions préalables

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.

Gestion des fonctions définies par l’utilisateur (UDFs)

Vous pouvez gérer des fonctions définies par l’utilisateur (UDFs) que vous pouvez écrire pour étendre le système pour effectuer des opérations qui ne sont pas disponibles grâce aux fonctions intégrées et définies par le système fournies par Snowflake. Une fois que vous avez créé une UDF, vous pouvez la réutiliser plusieurs fois. Pour plus d’informations, voir Vue d’ensemble des fonctions définies par l’utilisateur.

Note

L’appel d’UDFs en utilisant l’API n’est actuellement pas pris en charge.

Les Snowflake Python APIs représentent des UDFs avec deux types distincts :

  • UserDefinedFunction : expose les propriétés d’une UDF telles que son nom, la liste des arguments, le type de retour et la définition de la fonction.

  • UserDefinedFunctionResource : expose des méthodes que vous pouvez utiliser pour récupérer un objet UserDefinedFunction correspondant, renommer l’UDF, et supprimer l’UDF.

Création d’une UDF

Pour créer une UDF, il faut d’abord créer un objet UserDefinedFunction, puis créer un objet UserDefinedFunctionCollection à partir de l’objet Root de l’API. En utilisant UserDefinedFunctionCollection.create, ajoutez la nouvelle UDF à Snowflake.

Lorsque vous créez une UDF, vous spécifiez un gestionnaire dont le code est écrit dans l’un des langages pris en charge suivants.

Python

Le code de l’exemple suivant crée un objet UserDefinedFunction qui représente une UDF nommée my_python_function dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour, le langage et la définition Python de l’UDF :

from snowflake.core.user_defined_function import (
    PythonFunction,
    ReturnDataType,
    UserDefinedFunction
)

function_of_python = UserDefinedFunction(
    "my_python_function",
    arguments=[],
    return_type=ReturnDataType(datatype="VARIANT"),
    language_config=PythonFunction(runtime_version="3.9", packages=[], handler="udf"),
    body="""
def udf():
    return {"key": "value"}
    """,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_python)
Copy

Java

Le code de l’exemple suivant crée un objet UserDefinedFunction qui représente une UDF nommée my_java_function dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour, le langage et la définition Java de l’UDF :

from snowflake.core.user_defined_function import (
    Argument,
    JavaFunction,
    ReturnDataType,
    UserDefinedFunction
)

function_body = """
    class TestFunc {
        public static String echoVarchar(String x) {
            return x;
        }
    }
"""

function_of_java = UserDefinedFunction(
    name="my_java_function",
    arguments=[Argument(name="x", datatype="STRING")],
    return_type=ReturnDataType(datatype="VARCHAR", nullable=True),
    language_config=JavaFunction(
        handler="TestFunc.echoVarchar",
        runtime_version="11",
        target_path=target_path,
        packages=[],
        called_on_null_input=True,
        is_volatile=True,
    ),
    body=function_body,
    comment="test_comment",
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_java)
Copy

JavaScript

Le code de l’exemple suivant crée un objet UserDefinedFunction qui représente une UDF nommée my_javascript_function dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour, le langage et la définition de l’UDFJavaScript :

from snowflake.core.user_defined_function import (
    Argument,
    ReturnDataType,
    JavaScriptFunction,
    UserDefinedFunction
)

function_body = """
    if (D <= 0) {
        return 1;
    } else {
        var result = 1;
        for (var i = 2; i <= D; i++) {
            result = result * i;
        }
        return result;
    }
"""

function_of_javascript = UserDefinedFunction(
    name="my_javascript_function",
    arguments=[Argument(name="d", datatype="DOUBLE")],
    return_type=ReturnDataType(datatype="DOUBLE"),
    language_config=JavaScriptFunction(),
    body=function_body,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_javascript)
Copy

Scala

Le code de l’exemple suivant crée un objet UserDefinedFunction qui représente une UDF nommée my_scala_function dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour, le langage et la définition de l’UDF Scala :

from snowflake.core.user_defined_function import (
    Argument,
    ReturnDataType,
    ScalaFunction,
    UserDefinedFunction
)

function_body = """
    class Echo {
        def echoVarchar(x : String): String = {
            return x
        }
    }
"""

function_of_scala = UserDefinedFunction(
    name="my_scala_function",
    arguments=[Argument(name="x", datatype="VARCHAR")],
    return_type=ReturnDataType(datatype="VARCHAR"),
    language_config=ScalaFunction(
        runtime_version="2.12", handler="Echo.echoVarchar", target_path=target_path, packages=[]
    ),
    body=function_body,
    comment="test_comment",
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_scala)
Copy

SQL

Le code de l’exemple suivant crée un objet UserDefinedFunction qui représente une UDF nommée my_sql_function dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour, le langage et la définition de l’UDFSQL :

from snowflake.core.user_defined_function import (
    ReturnDataType,
    SQLFunction,
    UserDefinedFunction
)

function_body = """3.141592654::FLOAT"""

function_of_sql = UserDefinedFunction(
    name="my_sql_function",
    arguments=[],
    return_type=ReturnDataType(datatype="FLOAT"),
    language_config=SQLFunction(),
    body=function_body,
)

root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_sql)
Copy

Obtention d’informations sur les UDF

Vous pouvez obtenir des informations sur une UDF en appelant la méthode UserDefinedFunctionResource.fetch, qui renvoie un objet UserDefinedFunction.

Le code de l’exemple suivant récupère des informations sur l’UDF my_javascript_function(DOUBLE) dans la base de données my_db et le schéma my_schema :

Note

Lorsque vous obtenez un objet ressource d’UDF, vous devez spécifier la signature complète (le nom de l’UDF et ses types de données de paramètres) car les UDFs peuvent être surchargées.

my_udf = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].fetch()
print(my_udf.to_dict())
Copy

Affichage des UDFs

Vous pouvez répertorier des UDFs à l’aide de la méthode UserDefinedFunctionCollection.iter qui renvoie un itérateur PagedIter d’objets UserDefinedFunction.

Le code de l’exemple suivant répertorie les UDFs dont le nom commence par my_java dans la base de données my_db et le schéma my_schema, puis imprime le nom de chacun :

udf_iter = root.databases["my_db"].schemas["my_schema"].user_defined_functions.iter(like="my_java%")
for udf_obj in udf_iter:
    print(udf_obj.name)
Copy

Changement de nom d’une UDF

Vous pouvez renommer une UDF avec un objet UserDefinedFunctionResource.

Le code dans l’exemple suivant obtient l’objet ressource my_javascript_function(DOUBLE) de l’UDF dans la base de données my_db et le schéma my_schema, puis renomme l’UDF en my_other_js_function tout en la déplaçant vers la base de données my_other_db et le schéma my_other_schema :

root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].rename(
    "my_other_js_function",
    target_database = "my_other_database",
    target_schema = "my_other_schema"
)
Copy

Suppression d’une UDF

Vous pouvez supprimer une UDF avec un objet UserDefinedFunctionResource.

Le code dans l’exemple suivant récupère my_javascript_function(DOUBLE) l’objet de ressource UDF puis supprime l’UDF :

my_udf_res = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"]
my_udf_res.drop()
Copy

Gestion de procédures stockées

Vous pouvez gérer des procédures stockées que vous pouvez écrire pour étendre le système avec du code procédural qui exécute SQL. Dans une procédure stockée, vous pouvez utiliser des constructions programmatiques pour effectuer des branchements et des boucles. Une fois que vous avez créé une procédure stockée, vous pouvez la réutiliser plusieurs fois. Pour plus d’informations, voir Vue d’ensemble des procédures stockées.

Les Snowflake Python APIs représentent des procédures avec deux types distincts :

  • Procedure : expose les propriétés d’une procédure telles que son nom, la liste des arguments, le type de retour et la définition de la procédure.

  • ProcedureResource : expose les méthodes que vous pouvez utiliser pour récupérer un objet Procedure correspondant, pour appeler la procédure et annuler la procédure.

Création d’une procédure

Pour créer une procédure, il faut d’abord créer un objet Procedure, puis créer un objet ProcedureCollection à partir de l’objet Root de l’API. En utilisant ProcedureCollection.create, ajoutez la nouvelle procédure à Snowflake.

Le code de l’exemple suivant crée un objet Procedure qui représente une procédure nommée my_procedure dans la base de données my_db et le schéma my_schema, avec les arguments spécifiés, le type de retour et la définition de la procédure SQL :

from snowflake.core.procedure import Argument, ColumnType, Procedure, ReturnTable, SQLFunction

procedure = Procedure(
    name="my_procedure",
    arguments=[Argument(name="id", datatype="VARCHAR")],
    return_type=ReturnTable(
        column_list=[
            ColumnType(name="id", datatype="NUMBER),
            ColumnType(name="price", datatype="NUMBER"),
        ]
    ),
    language_config=SQLFunction(),
    body="
        DECLARE
            res RESULTSET DEFAULT (SELECT * FROM invoices WHERE id = :id);
        BEGIN
            RETURN TABLE(res);
        END;
    ",
)

procedures = root.databases["my_db"].schemas["my_schema"].procedures
procedures.create(procedure)
Copy

Appel d’une procédure

Vous pouvez appeler une procédure avec un objet ProcedureResource.

Le code dans l’exemple suivant obtient l’objet de ressource de procédure my_procedure(NUMBER, NUMBER), crée un objet CallArgumentList, puis appelle la procédure en utilisant cette liste d’arguments.

Note

Lors de l’obtention d’un objet de ressource de procédure, vous devez spécifier la signature complète (le nom de la procédure et ses types de données de paramètres) car les procédures peuvent être surchargées.

from snowflake.core.procedure import CallArgument, CallArgumentList

procedure_reference = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
call_argument_list = CallArgumentList(call_arguments=[
    CallArgument(name="id", datatype="NUMBER", value=1),
])
procedure_reference.call(call_argument_list)
Copy

Obtenir les détails de la procédure

Vous pouvez obtenir des informations sur une procédure en appelant la méthode ProcedureResource.fetch, qui renvoie un objet Procedure.

Le code de l’exemple suivant récupère des informations sur la procédure my_procedure(NUMBER, NUMBER) dans la base de données my_db et le schéma my_schema :

my_procedure = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"].fetch()
print(my_procedure.to_dict())
Copy

Affichage des procédures

Vous pouvez répertorier des procédures à l’aide de la méthode ProcedureCollection.iter qui renvoie un itérateur PagedIter d’objets Procedure.

Le code de l’exemple suivant répertorie les procédures dont le nom commence par my dans la base de données my_db et le schéma my_schema, puis imprime le nom de chacun :

procedure_iter = root.databases["my_db"].schemas["my_schema"].procedures.iter(like="my%")
for procedure_obj in procedure_iter:
    print(procedure_obj.name)
Copy

Suppression d’une procédure

Vous pouvez supprimer une procédure avec un objet ProcedureResource.

Le code dans l’exemple suivant obtient l’objet de ressource de procédure my_procedure(NUMBER, NUMBER) dans la base de données my_db et le schéma my_schema, puis supprime la procédure.

my_procedure_res = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(NUMBER, NUMBER)"]
my_procedure_res.drop()
Copy