Criação de funções definidas pelo usuário (UDFs) para DataFrames em Java

A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma expressão lambda em Java. Este tópico explica como criar esses tipos de funções.

Neste tópico:

Introdução

Você pode chamar APIs do Snowpark para criar funções definidas pelo usuário (UDFs) para expressões lambda em Java, e você pode chamar essas UDFs para processar os dados em seu DataFrame.

Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark faz a serialização e carrega o código da sua UDF para um estágio. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão localizados. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.

Em seu código personalizado, você também pode chamar o código que está empacotado em arquivos JAR (por exemplo, classes de Java para uma biblioteca de terceiros).

Você pode criar uma UDF para seu código personalizado de duas maneiras:

  • Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register an anonymous UDF (doubleUdf)
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
    // Call the anonymous UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy
  • Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.

    import com.snowflake.snowpark_java.types.*;
    ...
    
    // Create and register a permanent named UDF ("doubleUdf")
    // that takes in an integer argument and returns an integer value.
    UserDefinedFunction doubleUdf =
      session
        .udf()
        .registerPermanent(
          "doubleUdf",
          (Integer x) -> x + x,
          DataTypes.IntegerType,
          DataTypes.IntegerType,
          "mystage");
    // Call the named UDF.
    DataFrame df = session.table("sample_product_data");
    DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
    dfWithDoubleQuantity.show();
    
    Copy

O resto deste tópico explica como criar UDFs.

Nota

Se você tiver definido uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark.

Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário escalares (UDFs).

Tipos de dados suportados para argumentos e valores de retorno

Para criar uma UDF para uma lambda de Java, você deve usar os tipos de dados suportados listados abaixo para os argumentos e o valor de retorno de seu método:

Tipo de dados SQL

Tipo de dados de Java

Notas

NUMBER

Os seguintes tipos são suportados:

  • Integer

  • Long

  • java.math.BigDecimal ou java.math.BigInteger

FLOAT

Float

DOUBLE

Double

VARCHAR

String

BOOLEAN

Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] ou Variant[]

OBJECT

Map<String, String> ou Map<String, Variant>

GEOGRAPHY

com.snowflake.snowpark_java.types.Geography

Especificação de dependências para uma UDF

Para definir uma UDF através da API do Snowpark, você deve chamar Session.addDependency() para quaisquer arquivos que contenham quaisquer classes e recursos dos quais sua UDF dependa (por exemplo, arquivos JAR, arquivos de recursos, etc.). (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos de uma UDF).

A biblioteca do Snowpark carrega esses arquivos para um estágio interno e adiciona os arquivos ao caminho da classe ao executar sua UDF.

Dica

Se você não quiser que a biblioteca carregue o arquivo toda vez que você executar seu aplicativo, carregue o arquivo para um estágio. Ao chamar addDependency, passe o caminho para o arquivo no estágio.

O exemplo a seguir demonstra como adicionar um arquivo JAR em um estágio como uma dependência:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar");
Copy

Os exemplos a seguir demonstram como adicionar dependências para arquivos JAR e arquivos de recursos:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar");

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/");

// Add a resource file.
session.addDependency("/<path>/my-resource.xml");
Copy

Você não deve precisar especificar as seguintes dependências:

  • Suas bibliotecas do Java runtime.

    Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.

  • O arquivo JAR do Snowpark.

    A biblioteca do Snowpark tenta automaticamente detectar e carregar o arquivo JAR do Snowpark para o servidor.

    Para evitar que a biblioteca carregue repetidamente o arquivo JAR do Snowpark para o servidor:

    1. Carregue o arquivo JAR do Snowpark para um estágio.

      Por exemplo, o comando a seguir carrega o arquivo JAR do Snowpark para o estágio @mystage. O comando PUT compacta o arquivo JAR e nomeia o arquivo resultante como snowpark-1.9.0 .jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Chame addDependency para adicionar o arquivo JAR do Snowpark no estágio como uma dependência.

      Por exemplo, para adicionar o arquivo JAR do Snowpark carregado pelo comando anterior:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz");

      Observe que o caminho especificado para o arquivo JAR inclui a extensão .gz do nome do arquivo, que foi adicionada pelo comando PUT.

  • O arquivo JAR ou o diretório com o aplicativo que está sendo executado.

    A biblioteca do Snowpark tenta automaticamente detectar e carregar essas dependências.

    Se a biblioteca do Snowpark não conseguir detectar essas dependências automaticamente, a biblioteca reporta um erro, e você deve chamar addDependency para adicionar essas dependências manualmente.

Se levar tempo demais para que as dependências sejam carregadas no estágio, a biblioteca do Snowpark gerará uma exceção de tempo limite. Para configurar o tempo máximo que a biblioteca do Snowpark deve esperar, defina a propriedade snowpark_request_timeout_in_seconds ao criar a sessão.

Como criar uma UDF anônima

Para criar uma UDF anônima, você pode:

  • Chamar o método estático Functions.udf passando a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.

  • Chamar o método registerTemporary na classe UDFRegistration passando a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.

    Você pode acessar uma instância da classe UDFRegistration chamando o método udf do objeto Session.

    Ao chamar registerTemporary, use uma assinatura de método que não tenha um parâmetro name. (Como você está criando uma UDF anônima, você não especifica um nome para a UDF).

Nota

Ao escrever código com multithread (por exemplo, ao usar coleções paralelas), use o método registerTemporary para registrar UDFs em vez de usar o método udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.

Esses métodos retornam um objeto UserDefinedFunction que você pode usar para chamar a UDF. (Consulte Como chamar funções definidas pelo usuário escalares (UDFs).)

O exemplo a seguir cria uma UDF anônima:

import com.snowflake.snowpark_java.types.*;
...

// Create and register an anonymous UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  Functions.udf((Integer x) -> x + x, DataTypes.IntegerType, DataTypes.IntegerType);
// Call the anonymous UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", doubleUdf.apply(Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

O exemplo a seguir cria uma UDF anônima que usa uma classe personalizada (LanguageDetector, que detecta o idioma usado no texto). O exemplo chama a UDF anônima para detectar o idioma na coluna text_data em um DataFrame e cria uma nova coluna DataFrame que inclui uma coluna adicional lang com o idioma utilizado.

import com.snowflake.snowpark_java.types.*;

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector;

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar");

// Create a detector
LanguageDetector detector = new LanguageDetector();

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
UserDefinedFunction langUdf =
  Functions.udf(
    (String s) -> Option(detector.detect(s)).getOrElse("UNKNOWN"),
    DataTypes.StringType,
    DataTypes.StringType);

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
DataFrame dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(Functions.col("text_data")));
Copy

Criação e registro de uma UDF nomeada

Se você quiser chamar uma UDF pelo nome (por exemplo, usando o método Functions.callUDF estático) ou se você precisar usar uma UDF em sessões posteriores, você pode criar e registrar uma UDF nomeada. Para isso, use um dos seguintes métodos na classe UDFRegistration:

  • registerTemporary se você apenas planeja usar a UDF na sessão atual

  • registerPermanent se você planeja usar a UDF em sessões posteriores

Para acessar um objeto da classe UDFRegistration, chame o método udf do objeto Session.

Ao chamar o método registerTemporary ou registerPermanent, passe a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.

Por exemplo:

import com.snowflake.snowpark_java.types.*;
...
// Create and register a temporary named UDF
// that takes in an integer argument and returns an integer value.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerTemporary(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType);
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

registerPermanent cria uma UDF que você pode usar nas sessões atuais e posteriores. Ao chamar registerPermanent, você também deve especificar um local em um local de estágio interno onde os arquivos JAR para a UDF e suas dependências serão carregados.

Nota

registerPermanent não oferece suporte para estágios externos.

Por exemplo:

import com.snowflake.snowpark_java.types.*;
...

// Create and register a permanent named UDF
// that takes in an integer argument and returns an integer value.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
UserDefinedFunction doubleUdf =
  session
    .udf()
    .registerPermanent(
      "doubleUdf",
      (Integer x) -> x + x,
      DataTypes.IntegerType,
      DataTypes.IntegerType,
      "mystage");
// Call the named UDF, passing in the "quantity" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleQuantity".
DataFrame df = session.table("sample_product_data");
DataFrame dfWithDoubleQuantity = df.withColumn("doubleQuantity", Functions.callUDF("doubleUdf", Functions.col("quantity")));
dfWithDoubleQuantity.show();
Copy

Como usar objetos que não são serializáveis

Quando você cria uma UDF para uma expressão lambda, a biblioteca do Snowpark serializa o fechamento da lambda e o envia para o servidor para execução.

Se um objeto capturado pelo fechamento da lambda não for serializável, a biblioteca do Snowpark gera uma exceção java.io.NotSerializableException.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Se isso ocorrer, você deve tornar o objeto serizável.

Como escrever um código de inicialização para uma UDF

Se sua UDF exigir um código ou contexto de inicialização, você pode fornecê-lo através de valores capturados como parte do fechamento da UDF.

O exemplo a seguir usa uma classe separada para inicializar o contexto necessário para duas UDFs.

  • A primeira UDF cria uma nova instância da classe dentro da lambda, de forma que a inicialização é realizada toda vez que a UDF é invocada.

  • A segunda UDF captura uma instância da classe gerada em seu programa cliente. O contexto gerado no cliente é serializado e é utilizado pela UDF. Observe que a classe do contexto deve ser serializável para que essa abordagem funcione.

import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
import java.io.Serializable;

// Context needed for a UDF.
class Context {
  double randomInt = Math.random();
}

// Serializable context needed for the UDF.
class SerContext implements Serializable {
  double randomInt = Math.random();
}

class TestUdf {
  public static void main(String[] args) {
    // Create the session.
    Session session = Session.builder().configFile("/<path>/profile.properties").create();
    session.range(1, 10, 2).show();

    // Create a DataFrame with two columns ("c" and "d").
    DataFrame dummy =
      session.createDataFrame(
        new Row[]{
          Row.create(1, 1),
          Row.create(2, 2),
          Row.create(3, 3)
        },
        StructType.create(
          new StructField("c", DataTypes.IntegerType),
          new StructField("d", DataTypes.IntegerType))
        );
    dummy.show();

    // Initialize the context once per invocation.
    UserDefinedFunction udfRepeatedInit =
      Functions.udf(
        (Integer i) -> new Context().randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfRepeatedInit.apply(dummy.col("c"))).show();

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    SerContext sC = new SerContext();
    UserDefinedFunction udfOnceInit =
      Functions.udf(
        (Integer i) -> sC.randomInt,
        DataTypes.IntegerType,
        DataTypes.DoubleType
      );
    dummy.select(udfOnceInit.apply(dummy.col("c"))).show();
    UserDefinedFunction udfOnceInit = udf((i: Int) => sC.randomInt);
  }
}
Copy

Como ler arquivos de uma UDF

Como mencionado anteriormente, a biblioteca do Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.

Além disso, se o conteúdo do arquivo permanecer o mesmo entre chamadas para a UDF, você pode escrever seu código para carregar o arquivo uma vez durante a primeira chamada e não em chamadas posteriores. Isso pode melhorar o desempenho de suas chamadas à UDF.

Para configurar uma UDF para ler um arquivo:

  1. Adicione o arquivo a um arquivo JAR.

    Por exemplo, se sua UDF precisa usar um arquivo em um subdiretório data/ (data/hello.txt), execute o comando jar para adicionar esse arquivo a um arquivo JAR:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Especifique que o arquivo JAR é uma dependência, que carrega o arquivo para o servidor e adiciona o arquivo ao caminho da classe. Consulte Especificação de dependências para uma UDF.

    Por exemplo:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar");
    
    Copy
  3. No arquivo da UDF, chame Class.forName().getResourceAsStream() para encontrar o arquivo no caminho da classe e ler o arquivo.

    Para evitar adicionar uma dependência em this, você pode usar Class.forName("com.snowflake.snowpark_java.DataFrame") (em vez de getClass()) para obter o objeto Class.

    Por exemplo, para ler o arquivo data/hello.txt:

    // Read data/hello.txt from myJar.jar.
    String resourceName = "/data/hello.txt";
    InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame").getResourceAsStream(resourceName);
    
    Copy

    Neste exemplo, o nome do recurso começa com /, indicando que este é o caminho completo do arquivo no arquivo JAR. (Nesse caso, o local do arquivo não é relativo ao pacote da classe).

Nota

Se você não espera que o conteúdo do arquivo mude entre as chamadas à UDF, leia o arquivo em um campo estático de sua classe e leia o arquivo somente se o campo não estiver definido.

O exemplo a seguir define um objeto (UDFCode) com uma função que será usada como UDF (readFileFunc). A função lê o arquivo data/hello.txt, que deve conter a cadeia de cadeia de caracteres hello,. A função precede essa cadeia de caracteres com a cadeia de caracteres passada como argumento.

import java.io.InputStream;
import java.nio.charset.StandardCharsets;

// Create a function class that reads a file.
class UDFCode {
  private static String fileContent = null;
  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // The file content is cached in 'fileContent'.
  public static String readFile() {
    if (fileContent == null) {
      try {
        String resourceName = "/data/hello.txt";
        InputStream inputStream = Class.forName("com.snowflake.snowpark_java.DataFrame")
          .getResourceAsStream(resourceName);
        fileContent = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
      } catch (Exception e) {
        fileContent = "Error while reading file";
      }
    }
    return fileContent;
  }
}
Copy

A próxima parte do exemplo registra a função como uma UDF anônima. O exemplo chama a UDF na coluna NAME em um DataFrame. O exemplo supõe que o arquivo data/hello.txt está empacotado no arquivo JAR myJar.jar.

import com.snowflake.snowpark_java.types.*;

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar");

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
DataFrame myDf = session.sql("select 'Raymond' NAME");

// Register the function that you defined earlier as an anonymous UDF.
UserDefinedFunction readFileUdf = session.udf().registerTemporary(
  (String s) -> UDFCode.readFile() + " : " + s, DataTypes.StringType, DataTypes.StringType);

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf.apply(Functions.col("NAME"))).show();
Copy

Criação de funções de tabela definidas pelo usuário (UDTFs)

Para criar e registrar uma UDTF no Snowpark, você deve:

As próximas seções descrevem essas etapas com mais detalhes.

Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDTF.

Como definir a classe UDTF

Defina uma classe que implemente uma das interfaces JavaUDTFn (por exemplo, JavaUDTF0, JavaUDTF1, etc.) no pacote com.snowflake.snowpark_java.udtf, onde n especifica o número de argumentos de entrada para sua UDTF. Por exemplo, se sua UDTF passar 2 argumentos de entrada, implemente a interface JavaUDTF2.

Em sua classe, implemente os seguintes métodos:

  • outputSchema(), que retorna um objeto types.StructType que descreve os nomes e tipos dos campos nas linhas retornadas (o “esquema” da saída).

  • process(), que é chamado uma vez para cada linha na partição de entrada (consulte a nota abaixo).

  • inputSchema(), que retorna um objeto types.StructType que descreve os tipos dos parâmetros de entrada.

    Se seu método process() passar argumentos Map, você deve implementar o método inputSchema(). Caso contrário, a implementação desse método é opcional.

  • endPartition(), que é chamado uma vez para cada partição após todas as linhas terem sido passadas para process().

Quando uma UDTF é chamada, as linhas são agrupadas em partições antes de serem passadas para a UDTF:

  • Se a instrução que chama a UDTF especifica a cláusula PARTITION (partições explícitas), essa cláusula determina como as linhas são particionadas.

  • Se a instrução não especificar a cláusula PARTITION (partições implícitas), o Snowflake determina a melhor maneira de particionar as linhas.

Para obter uma explicação das partições, consulte Funções de tabela e partições.

Para obter um exemplo de uma classe de UDTF, consulte Exemplo de uma classe de UDTF.

Como implementar o método outputSchema()

Implemente o método outputSchema() para definir os nomes e tipos de dados dos campos (o “esquema de saída”) das linhas retornadas pelos métodos process() e endPartition().

public StructType outputSchema()
Copy

Nesse método, construa e retorne um objeto StructType que contenha objetos StructField representando o tipo de dados do Snowflake de cada campo em uma linha retornada. O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de saída de uma UDTF:

Tipo de dados SQL

Tipo de Java

Tipo de com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<String, String>

MapType(StringType, StringType)

OBJECT

java.util.Map<String, Variant>

MapType(StringType, VariantType)

Por exemplo, se sua UDTF retorna uma linha com um único campo de número inteiro:

public StructType outputSchema() {
  return StructType.create(new StructField("C1", DataTypes.IntegerType));
}
Copy

Como implementar o método process()

Em sua classe da UDTF, implemente o método process():

Stream<Row> process(A0 arg0, ... A<n> arg<n>)
Copy

onde n é o número de argumentos passados à sua UDTF.

O número de argumentos na assinatura corresponde à interface que você implementou. Por exemplo, se sua UDTF passa 2 argumentos de entrada e você está implementando a interface JavaUDTF2, o método process() tem essa assinatura:

Stream<Row> process(A0 arg0, A1 arg1)
Copy

Esse método é invocado uma vez para cada linha na partição de entrada.

Como escolher os tipos de argumentos

Para o tipo de cada argumento no método process(), use o tipo de Java que corresponda ao tipo de dados do Snowflake do argumento passado para a UDTF.

O Snowflake oferece suporte para os seguintes tipos de dados para os argumentos de uma UDTF:

Tipo de dados SQL

Tipo de dados de Java

Notas

NUMBER

Os seguintes tipos são suportados:

  • java.lang.Short

  • java.lang.Integer

  • java.lang.Long

  • java.math.BigDecimal

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

VARCHAR

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

byte[]

VARIANT

com.snowflake.snowpark_java.types.Variant

ARRAY

String[] ou Variant[]

OBJECT

Map<String, String> ou Map<String, Variant>

Nota

Se você passar argumentos java.util.Map, você deve implementar o método inputSchema para descrever os tipos desses argumentos. Consulte Como implementar o método inputSchema().

Como retornar linhas

No método process(), construa e retorne um java.util.stream.Stream de objetos Row contendo os dados a serem retornados pela UDTF para os valores de entrada dados. Os campos na linha devem usar os tipos que você especificou no método outputSchema. (Consulte Como implementar o método outputSchema().)

Por exemplo, se sua UDTF gera linhas, construa e retorne um Iterable de objetos Row para as linhas geradas:

import java.util.stream.Stream;
...

public Stream<Row> process(Integer start, Integer count) {
  Stream.Builder<Row> builder = Stream.builder();
  for (int i = start; i < start + count ; i++) {
    builder.add(Row.create(i));
  }
  return builder.build();
}
Copy

Como implementar o método inputSchema()

Se o método process() passar um argumento java.util.Map, você deve implementar o método inputSchema() para descrever os tipos dos argumentos de entrada.

Nota

Se o método process() não passar argumentos Map, você não precisa implementar o método inputSchema().

Nesse método, construa e retorne um objeto StructType que contenha objetos StructField representando o tipo de dados do Snowflake de cada argumento passado para o método process(). O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de entrada de uma UDTF:

Tipo de dados SQL

Tipo de Java

Tipo de com.snowflake.snowpark_java.types

NUMBER

java.lang.Short

ShortType

NUMBER

java.lang.Integer

IntType

NUMBER

java.lang.Long

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

java.lang.Float

FloatType

DOUBLE

java.lang.Double

DoubleType

VARCHAR

java.lang.String

StringType

BOOLEAN

java.lang.Boolean

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

byte[]

BinaryType

VARIANT

com.snowflake.snowpark_java.types.Variant

VariantType

ARRAY

String[]

ArrayType(StringType)

ARRAY

Variant[]

ArrayType(VariantType)

OBJECT

java.util.Map<String, String>

MapType(StringType, StringType)

OBJECT

java.util.Map<String, Variant>

MapType(StringType, VariantType)

Por exemplo, suponha que seu método process() passe um argumento Map<String, String> e um argumento Map<String, Variant>:

import java.util.Map;
import com.snowflake.snowpark_java.*;
import com.snowflake.snowpark_java.types.*;
...

public Stream<Row> process(Map<String, String> stringMap, Map<String, Variant> varMap) {
  ...
}
Copy

Você deve implementar o método inputSchema() para retornar um objeto StructType que descreva os tipos desses argumentos de entrada:

import java.util.Map;
import com.snowflake.snowpark_java.types.*;
...

public StructType inputSchema() {
  return StructType.create(
      new StructField(
          "string_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)),
      new StructField(
          "variant_map",
          DataTypes.createMapType(DataTypes.StringType, DataTypes.VariantType)));
}
Copy

Como implemer o método endPartition()

Implemente o método endPartition e adicione o código que deve ser executado após todas as linhas na partição de entrada terem sido passadas para o método process. O método endPartition é invocado uma vez para cada partição de entrada.

public Stream<Row> endPartition()
Copy

Você pode usar esse método se precisar executar qualquer trabalho depois que todas as linhas na partição tiverem sido processadas. Por exemplo, você pode:

  • Linhas de retorno baseadas em informações de estado que você captura em cada chamada do método process.

  • Linhas de retorno que não estão ligadas a uma linha de entrada específica.

  • Linhas de retorno que resumem as linhas de saída que foram geradas pelo método process.

Os campos nas linhas que você retornar devem corresponder aos tipos que você especificou no método outputSchema. (Consulte Como implementar o método outputSchema().)

Se você não precisar retornar linhas adicionais no final de cada partição, retorne um Stream vazio. Por exemplo:

public Stream<Row> endPartition() {
  return Stream.empty();
}
Copy

Nota

Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando endPartition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.

Exemplo de uma classe de UDTF

A seguir, veja um exemplo de uma classe de UDTF que gera uma faixa de linhas.

  • Como a UDTF passa 2 argumentos, a classe implementa a JavaUDTF2.

  • Os argumentos start e count especificam o número inicial para a linha e o número de linhas a gerar.

import java.util.stream.Stream;
import com.snowflake.snowpark_java.types.*;
import com.snowflake.snowpark_java.udtf.*;

class MyRangeUdtf implements JavaUDTF2<Integer, Integer> {
  public StructType outputSchema() {
    return StructType.create(new StructField("C1", DataTypes.IntegerType));
  }

  // Because the process() method in this example does not pass in Map arguments,
  // implementing the inputSchema() method is optional.
  public StructType inputSchema() {
    return StructType.create(
            new StructField("start_value", DataTypes.IntegerType),
            new StructField("value_count", DataTypes.IntegerType));
  }

  public Stream<Row> endPartition() {
    return Stream.empty();
  }

  public Stream<Row> process(Integer start, Integer count) {
    Stream.Builder<Row> builder = Stream.builder();
    for (int i = start; i < start + count ; i++) {
      builder.add(Row.create(i));
    }
    return builder.build();
  }
}
Copy

Como registrar a UDTF

Em seguida, crie uma instância da nova classe e registre a classe chamando um dos métodos UDTFRegistration. Você pode registrar uma UDTF temporária ou permanente.

Como registrar uma UDTF temporária

Para registrar uma UDTF temporária, chame UDTFRegistration.registerTemporary:

  • Se você não precisar chamar a UDTF pelo nome, você pode registrar uma UDTF anônima passando uma instância da classe:

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
    
    Copy
  • Se você precisar chamar a UDTF pelo nome, passe também um nome da UDTF:

    // Register the MyRangeUdtf class that was defined in the previous example.
    TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
    // Call the UDTF by name.
    session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
    
    Copy

Como registrar uma UDTF permanente

Se você precisar usar a UDTF em sessões posteriores, chame UDTFRegistration.registerPermanent para registrar uma UDTF permanente.

Ao registrar uma UDTF permanente, você deve especificar um estágio onde o método de registro carregará os arquivos JAR para a UDTF e suas dependências. Por exemplo:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerPermanent("myUdtf", new MyRangeUdtf(), "@myStage");
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Como chamar uma UDTF

Após registrar a UDTF, você pode chamar a UDTF passando o objeto retornado TableFunction para o método tableFunction do objeto Session:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary(new MyRangeUdtf());
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, Functions.lit(10), Functions.lit(5)).show();
Copy

Para chamar uma UDTF pelo nome, construa um objeto TableFunction com esse nome e passe-a para o método tableFunction:

// Register the MyRangeUdtf class that was defined in the previous example.
TableFunction tableFunction = session.udtf().registerTemporary("myUdtf", new MyRangeUdtf());
// Call the UDTF by name.
session.tableFunction(new TableFunction("myUdtf"), Functions.lit(10), Functions.lit(5)).show();
Copy

Você também pode chamar uma UDTF diretamente através de uma instrução SELECT:

session.sql("select * from table(myUdtf(10, 5))");
Copy