Como chamar funções e procedimentos armazenados no Snowpark Java

Para processar dados em um DataFrame, você pode chamar funções SQL definidas pelo sistema, funções definidas pelo usuário e procedimentos armazenados. Este tópico explica como chamá-los no Snowpark.

Neste tópico:

Como chamar funções definidas pelo sistema

Se você precisar chamar funções de SQL definidas pelo sistema, use os métodos estáticos equivalentes na classe Functions.

O exemplo a seguir chama o método estático upper na classe Functions (o equivalente da função UPPER definida pelo sistema) para retornar os valores na coluna do nome com as letras em maiúsculas:

DataFrame df = session.table("sample_product_data");
df.select(Functions.upper(Functions.col("name"))).show();
Copy

Se uma função de SQL definida pelo sistema não estiver disponível na classe Functions, você pode usar o método estático Functions.callUDF para chamar a função definida pelo sistema.

Para callUDF, passe o nome da função definida pelo sistema como o primeiro argumento. Se você precisar passar os valores das colunas para a função definida pelo sistema, defina e passe objetos Column como argumentos adicionais para o método callUDF.

O exemplo a seguir chama a função definida pelo sistema RADIANS passando o valor da coluna degrees:

// Call the system-defined function RADIANS() on degrees.
DataFrame dfDegrees = session.range(0, 360, 45).rename("degrees", Functions.col("id"));
dfDegrees.select(Functions.col("degrees"), Functions.callUDF("radians", Functions.col("degrees"))).show();
Copy

O método callUDF retorna um Column, que você pode passar para os métodos de transformação de DataFrame (por exemplo, filtro, seleção, etc.).

Como chamar funções definidas pelo usuário escalares (UDFs)

O método para chamar uma UDF depende de como a UDF foi criada:

Chamar uma UDF retorna um objeto Column contendo o valor de retorno da UDF.

O exemplo a seguir chama a função UDF doubleUdf passando o valor das colunas quantity. O exemplo passa o valor de retorno de doubleUdf para o método select do DataFrame.

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Como chamar funções de tabela (funções do sistema e UDTFs)

Para chamar uma função de tabela ou uma função de tabela definida pelo usuário (UDTF):

  1. Construa um objeto TableFunction passando o nome da função de tabela.

  2. Chame o método tableFunction do objeto Session passando o objeto TableFunction e um Map de nomes de argumentos e valores de entrada.

table?Function retorna um DataFrame que contém a saída da função de tabela.

Por exemplo, suponha que você tenha executado o seguinte comando para criar uma UDTF de SQL:

CREATE OR REPLACE FUNCTION product_by_category_id(cat_id INT)
  RETURNS TABLE(id INT, name VARCHAR)
  AS
  $$
    SELECT id, name
      FROM sample_product_data
      WHERE category_id = cat_id
  $$
  ;
Copy

O código a seguir chama essa UDTF e cria um DataFrame para a saída da UDTF. O exemplo imprime as primeiras 10 linhas de saída para o console.

import java.util.HashMap;
import java.util.Map;
...

Map<String, Column> arguments = new HashMap<>();
arguments.put("cat_id", Functions.lit(10));
DataFrame dfTableFunctionOutput = session.tableFunction(new TableFunction("product_by_category_id"), arguments);
dfTableFunctionOutput.show();
Copy

Se você precisar unir a saída de uma função de tabela com um DataFrame, chame o método join, que passa uma TableFunction.

Como chamar procedimentos armazenados

Você pode executar um procedimento no lado do servidor (no ambiente Snowflake) ou localmente. Tenha em mente que como os dois ambientes são diferentes, as condições e os resultados da execução dos procedimentos podem diferir entre eles.

Você pode chamar um procedimento com o API Snowpark de uma das seguintes maneiras:

  • Execute uma função localmente para teste e depuração usando o método SProcRegistration.runLocally.

  • Execute um procedimento no ambiente Snowflake do lado do servidor usando um dos métodos Session.storedProcedure. Isso inclui um procedimento com escopo definido para a sessão atual ou um procedimento permanente armazenado no Snowflake.

Você também pode chamar um procedimento armazenado permanente criado com o API Snowpark a partir do código SQL. Para obter mais informações, consulte Chamada de um procedimento armazenado.

Para obter mais informações sobre como criar procedimentos com o API Snowpark, consulte Criação de procedimentos armazenados para DataFrames em Java.

Execução da lógica de um procedimento localmente

Você pode executar a função lambda para seu procedimento em seu ambiente local usando o método SProcRegistration.runLocally. O método executa a função e retorna seu resultado como o tipo retornado pela função.

Por exemplo, você pode chamar uma função lambda que pretende usar em um procedimento antes de registrar um procedimento dela no Snowflake. Você começa atribuindo o código lambda como um valor a uma variável cujo tipo é uma das interfaces com.snowflake.snowpark_java.sproc.JavaSProc. Usando essa variável, você pode testar a chamada da função com o método SProcRegistration.runLocally. Você também pode usar a variável para representar a função ao registrar o procedimento.

O código no exemplo a seguir inicializa uma variável JavaSProc da função lambda que será a lógica do procedimento. Em seguida, ele testa a função passando a variável para o método SProcRegistration.runLocally com o argumento da função. A variável também é usada para registrar a função.

Session session = Session.builder().configFile("my_config.properties").create();

// Assign the lambda function to a variable.
JavaSProc1<Integer, Integer> func =
  (Session session, Integer num) -> num + 1;

// Execute the function locally.
int result = (Integer)session.sproc().runLocally(func, 1);
System.out.println("\nResult: " + result);

// Register the procedure.
StoredProcedure sp =
  session.sproc().registerTemporary(
    func,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server.
session.storedProcedure(sp, 1).show();
Copy

Execução de um procedimento no servidor

Para executar um procedimento no ambiente Snowflake no servidor, utilize o método Session.storedProcedure. O método retornará um objeto DataFrame.

Por exemplo, você pode executar:

O código no exemplo a seguir cria um procedimento temporário projetado para ser executado no servidor, mas dura apenas enquanto durar a sessão atual do Snowpark. Em seguida, ele executa o procedimento usando o nome do procedimento e a variável com.snowflake.snowpark_java.StoredProcedure que o representa.

Session session = Session.builder().configFile("my_config.properties").create();

String incrementProc = "increment";

// Register the procedure.
StoredProcedure tempSP =
  session.sproc().registerTemporary(
    incrementProc,
    (Session session, Integer num) -> num + 1,
    DataTypes.IntegerType,
    DataTypes.IntegerType
  );

// Execute the procedure on the server by passing the procedure's name.
session.storedProcedure(incrementProc, 1).show();

// Execute the procedure on the server by passing a variable
// representing the procedure.
session.storedProcedure(tempSP, 1).show();
Copy