Como chamar funções e procedimentos armazenados no Snowpark Scala

Para processar dados em um DataFrame, você pode chamar funções SQL definidas pelo sistema, funções definidas pelo usuário e procedimentos armazenados. Este tópico explica como chamá-los no Snowpark.

Neste tópico:

Como chamar funções definidas pelo sistema

Se você precisar chamar funções SQL definidas pelo sistema, use as funções equivalentes no objeto com.snowflake.snowpark.functions.

O exemplo a seguir chama a função upper no objeto functions (o equivalente da função UPPER definida pelo sistema) para retornar os valores na coluna do nome com as letras em maiúsculas:

// Import the upper function from the functions object.
import com.snowflake.snowpark.functions._
...
session.table("products").select(upper(col("name"))).show()
Copy

Se uma função de SQL definida pelo sistema não estiver disponível no objeto de funções, você pode usar uma das seguintes abordagens:

  • Use a função callBuiltin para chamar a função definida pelo sistema.

  • Use a função builtin para criar um objeto de função que você pode usar para chamar a função definida pelo sistema.

callBuiltin e builtin são definidos no objeto com.snowflake.snowpark.functions.

Para callBuiltin, passe o nome da função definida pelo sistema como o primeiro argumento. Se você precisar passar os valores das colunas para a função definida pelo sistema, defina e passe objetos Column como argumentos adicionais para a função callBuiltin.

O exemplo a seguir chama a função definida pelo sistema RADIANS passando o valor da coluna col1:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Call the system-defined function RADIANS() on col1.
val result = df.select(callBuiltin("radians", col("col1"))).collect()
Copy

A função callBuiltin retorna um Column, que você pode passar para os métodos de transformação de DataFrame (por exemplo, filtro, seleção, etc.).

Para builtin, passe o nome da função definida pelo sistema e use o objeto da função retornada para chamar a função definida pelo sistema. Por exemplo:

// Import the callBuiltin function from the functions object.
import com.snowflake.snowpark.functions._
...
// Create a function object for the system-defined function RADIANS().
val radians = builtin("radians")
// Call the system-defined function RADIANS() on col1.
val result = df.select(radians(col("col1"))).collect()
Copy

Como chamar funções definidas pelo usuário escalares (UDFs)

O método para chamar uma UDF depende de como a UDF foi criada:

  • Para chamar uma UDF anônima, chame o método apply do objeto UserDefinedFunction que foi retornado quando você criou a UDF.

    Os argumentos que você passa para uma UDF devem ser objetos Column objetos. Se você precisar passar um literal, use lit() como explicado em Como usar literais como objetos de coluna.

  • Para chamar UDFs que você registrou pelo nome e UDFs que você criou ao executar CREATE FUNCTION, use a função callUDF no objeto com.snowflake.snowpark.functions.

    Passe o nome da UDF como o primeiro argumento e quaisquer parâmetros da UDF como argumentos adicionais.

Chamar uma UDF retorna um objeto Column contendo o valor de retorno da UDF.

O exemplo a seguir chama a função UDF myFunction passando os valores das colunas col1 e col2. O exemplo passa o valor de retorno de myFunction para o método select do DataFrame.

// Import the callUDF function from the functions object.
import com.snowflake.snowpark.functions._
...
// Runs the scalar function 'myFunction' on col1 and col2 of df.
val result =
    df.select(
        callUDF("myDB.schema.myFunction", col("col1"), col("col2"))
    ).collect()
Copy

Como chamar funções de tabela (funções do sistema e UDTFs)

Para chamar uma função de tabela ou uma função de tabela definida pelo usuário (UDTF):

  1. Construa um objeto TableFunction passando o nome da função de tabela.

    Se você estiver criando uma UDTF no Snowpark, você pode simplesmente usar o objeto TableFunction retornado pelo método UDTFRegistration.registerTemporary ou UDTFRegistration.registerPermanent. Consulte Criação de funções de tabela definidas pelo usuário (UDTFs).

  2. Chame session.tableFunction, passando o objeto TableFunction e um Map de nomes de argumentos e valores de entrada.

table?Function retorna um DataFrame que contém a saída da função de tabela.

Por exemplo, suponha que você tenha executado o seguinte comando para criar uma UDTF de SQL:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

O código a seguir chama essa UDTF e cria um DataFrame para a saída da UDTF. O exemplo imprime as primeiras 10 linhas de saída para o console.

val dfTableFunctionOutput = session.tableFunction(TableFunction("product_by_category_id"), Map("cat_id" -> lit(10)))
dfTableFunctionOutput.show()
Copy

Se você precisar unir a saída de uma função de tabela com um DataFrame, chame o método DataFrame.join, que passa uma TableFunction.

Como chamar procedimentos armazenados

Você pode executar um procedimento no lado do servidor (no ambiente Snowflake) ou localmente. Tenha em mente que como os dois ambientes são diferentes, as condições e os resultados da execução dos procedimentos podem diferir entre eles.

Você pode chamar um procedimento com o API Snowpark de uma das seguintes maneiras:

  • Execute uma função localmente para teste e depuração usando o método SProcRegistration.runLocally.

  • Execute um procedimento no ambiente Snowflake do lado do servidor usando o método Session.storedProcedure. Isso inclui um procedimento com escopo definido para a sessão atual ou um procedimento permanente armazenado no Snowflake.

Você também pode chamar um procedimento armazenado permanente criado com o API Snowpark a partir de uma planilha do Snowflake. Para obter mais informações, consulte Chamada de um procedimento armazenado.

Para obter mais informações sobre como criar procedimentos com o API Snowpark, consulte Criação de procedimentos armazenados para DataFrames em Scala.

Execução da lógica de um procedimento localmente

Você pode executar a função lambda para seu procedimento em seu ambiente local usando o método SProcRegistration.runLocally. O método executa a função e retorna seu resultado como o tipo retornado pela função.

Por exemplo, você pode chamar localmente (no lado do cliente) uma função lambda que pretende usar em um procedimento antes de registrar um procedimento dela no Snowflake. Você começa atribuindo o código lambda como um valor a uma variável. Você passa essa variável para o método SProcRegistration.runLocally para executá-la no lado do cliente. Você também pode usar a variável para representar a função ao registrar o procedimento.

O código no exemplo a seguir atribui a função à variável func. Em seguida, ele testa a função localmente, passando a variável para o método SProcRegistration.runLocally com o valor do argumento da função. A variável também é usada para registrar o procedimento.

val session = Session.builder.configFile("my_config.properties").create

// Assign the lambda function.
val func = (session: Session, num: Int) => num + 1

// Execute the function locally.
val result = session.sproc.runLocally(func, 1)
print("\nResult: " + result)
Copy

Execução de um procedimento no servidor

Para executar um procedimento no ambiente Snowflake no servidor, utilize o método Session.storedProcedure. O método retornará um objeto DataFrame.

Por exemplo, você pode executar:

O código no exemplo a seguir cria um procedimento temporário projetado para ser executado no servidor, mas dura apenas enquanto durar a sessão atual do Snowpark. Em seguida, ele executa o procedimento usando o nome do procedimento e a variável StoredProcedure que o representa.

val session = Session.builder.configFile("my_config.properties").create

val name: String = "add_two"

val tempSP: StoredProcedure =
  session.sproc.registerTemporary(
    name,
    (session: Session, num: Int) => num + 2
  )

session.storedProcedure(name, 1).show()

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy