Como chamar funções e procedimentos armazenados no Snowpark Scala¶
Para processar dados em um DataFrame, você pode chamar funções SQL definidas pelo sistema, funções definidas pelo usuário e procedimentos armazenados. Este tópico explica como chamá-los no Snowpark.
Neste tópico:
Como chamar funções definidas pelo sistema¶
Se você precisar chamar funções SQL definidas pelo sistema, use as funções equivalentes no objeto com.snowflake.snowpark.functions.
O exemplo a seguir chama a função upper
no objeto functions
(o equivalente da função UPPER definida pelo sistema) para retornar os valores na coluna do nome com as letras em maiúsculas:
// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Se uma função de SQL definida pelo sistema não estiver disponível no objeto de funções, você pode usar uma das seguintes abordagens:
Use a função
callBuiltin
para chamar a função definida pelo sistema.Use a função
builtin
para criar um objeto de função que você pode usar para chamar a função definida pelo sistema.
callBuiltin
e builtin
são definidos no objeto com.snowflake.snowpark.functions
.
Para callBuiltin
, passe o nome da função definida pelo sistema como o primeiro argumento. Se você precisar passar os valores das colunas para a função definida pelo sistema, defina e passe objetos Column como argumentos adicionais para a função callBuiltin
.
O exemplo a seguir chama a função definida pelo sistema RADIANS passando o valor da coluna col1
:
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
A função callBuiltin
retorna um Column
, que você pode passar para os métodos de transformação de DataFrame (por exemplo, filtro, seleção, etc.).
Para builtin
, passe o nome da função definida pelo sistema e use o objeto da função retornada para chamar a função definida pelo sistema. Por exemplo:
// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Como chamar funções definidas pelo usuário escalares (UDFs)¶
O método para chamar uma UDF depende de como a UDF foi criada:
Para chamar uma UDF anônima, chame o método
apply
do objeto UserDefinedFunction que foi retornado quando você criou a UDF.Os argumentos que você passa para uma UDF devem ser objetos Column objetos. Se você precisar passar um literal, use
lit()
como explicado em Como usar literais como objetos de coluna.Para chamar UDFs que você registrou pelo nome e UDFs que você criou ao executar CREATE FUNCTION, use a função
callUDF
no objetocom.snowflake.snowpark.functions
.Passe o nome da UDF como o primeiro argumento e quaisquer parâmetros da UDF como argumentos adicionais.
Chamar uma UDF retorna um objeto Column
contendo o valor de retorno da UDF.
O exemplo a seguir chama a função UDF myFunction
passando os valores das colunas col1
e col2
. O exemplo passa o valor de retorno de myFunction
para o método select
do DataFrame.
// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
df.select(
callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
).collect()
Como chamar funções de tabela (funções do sistema e UDTFs)¶
Para chamar uma função de tabela ou uma função de tabela definida pelo usuário (UDTF):
Construa um objeto TableFunction passando o nome da função de tabela.
Se você estiver criando uma UDTF no Snowpark, você pode simplesmente usar o objeto
TableFunction
retornado pelo métodoUDTFRegistration.registerTemporary
ouUDTFRegistration.registerPermanent
. Consulte Criação de funções de tabela definidas pelo usuário (UDTFs).Chame session.tableFunction, passando o objeto
TableFunction
e umMap
de nomes de argumentos e valores de entrada.
table?Function
retorna um DataFrame que contém a saída da função de tabela.
Por exemplo, suponha que você tenha executado o seguinte comando para criar uma UDTF de SQL:
CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
RETURNS TABLE(id INT, name VARCHAR)
AS
$$
SELECT id, name
FROM sample_product_data
WHERE category_id = cat_id
$$
;
O código a seguir chama essa UDTF e cria um DataFrame para a saída da UDTF. O exemplo imprime as primeiras 10 linhas de saída para o console.
val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Se você precisar unir a saída de uma função de tabela com um DataFrame, chame o método DataFrame.join, que passa uma TableFunction.
Como chamar procedimentos armazenados¶
Você pode executar um procedimento no lado do servidor (no ambiente Snowflake) ou localmente. Tenha em mente que como os dois ambientes são diferentes, as condições e os resultados da execução dos procedimentos podem diferir entre eles.
Você pode chamar um procedimento com o API Snowpark de uma das seguintes maneiras:
Execute uma função localmente para teste e depuração usando o método
SProcRegistration.runLocally
.Execute um procedimento no ambiente Snowflake do lado do servidor usando o método
Session.storedProcedure
. Isso inclui um procedimento com escopo definido para a sessão atual ou um procedimento permanente armazenado no Snowflake.
Você também pode chamar um procedimento armazenado permanente criado com o API Snowpark a partir de uma planilha do Snowflake. Para obter mais informações, consulte Chamada de um procedimento armazenado.
Para obter mais informações sobre como criar procedimentos com o API Snowpark, consulte Criação de procedimentos armazenados para DataFrames em Scala.
Execução da lógica de um procedimento localmente¶
Você pode executar a função lambda para seu procedimento em seu ambiente local usando o método SProcRegistration.runLocally
. O método executa a função e retorna seu resultado como o tipo retornado pela função.
Por exemplo, você pode chamar localmente (no lado do cliente) uma função lambda que pretende usar em um procedimento antes de registrar um procedimento dela no Snowflake. Você começa atribuindo o código lambda como um valor a uma variável. Você passa essa variável para o método SProcRegistration.runLocally
para executá-la no lado do cliente. Você também pode usar a variável para representar a função ao registrar o procedimento.
O código no exemplo a seguir atribui a função à variável func
. Em seguida, ele testa a função localmente, passando a variável para o método SProcRegistration.runLocally
com o valor do argumento da função. A variável também é usada para registrar o procedimento.
val session = Session.builder.configFile("my_config.properties").create
// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1
// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Execução de um procedimento no servidor¶
Para executar um procedimento no ambiente Snowflake no servidor, utilize o método Session.storedProcedure
. O método retornará um objeto DataFrame
.
Por exemplo, você pode executar:
Um procedimento temporário ou permanente que você cria usando o API Snowpark.
Um procedimento criado usando uma instrução CREATE PROCEDURE.
O código no exemplo a seguir cria um procedimento temporário projetado para ser executado no servidor, mas dura apenas enquanto durar a sessão atual do Snowpark. Em seguida, ele executa o procedimento usando o nome do procedimento e a variável StoredProcedure
que o representa.
val session = Session.builder.configFile("my_config.properties").create
val name: String = "add_two"
val tempSP: StoredProcedure =
session.sproc.registerTemporary(
name,
(session: Session, num: Int) => num + 2
)
session.storedProcedure(name, 1).show()
// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();
// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();