Criação de funções definidas pelo usuário (UDFs) para DataFrames no Scala

A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função no Scala. Este tópico explica como criar esses tipos de funções.

Neste tópico:

Introdução

Você pode chamar APIs do Snowpark para criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas no Scala, e você pode chamar essas UDFs para processar os dados em seu DataFrame.

Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark faz a serialização e o upload do código para sua UDF para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão localizados. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.

Em seu código personalizado, você também pode chamar o código que está empacotado em arquivos JAR (por exemplo, classes de Java para uma biblioteca de terceiros).

Você pode criar uma UDF para seu código personalizado de duas maneiras:

  • Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.

    // Create and register an anonymous UDF (doubleUdf).
    val doubleUdf = udf((x: Int) => x + x)
    // Call the anonymous UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
    
    Copy
  • Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.

    // Create and register a permanent named UDF ("doubleUdf").
    session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
    // Call the named UDF.
    val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
    
    Copy

As próximas seções fornecem informações importantes sobre a criação de UDFs no Snowpark:

O resto deste tópico explica como criar UDFs.

Nota

Se você tiver definido uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark.

Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário escalares (UDFs).

Tipos de dados suportados para argumentos e valores de retorno

Para criar uma UDF para uma função do Scala ou lambda, você deve usar os tipos de dados suportados listados abaixo para os argumentos e valor de retorno de sua função ou lambda:

Tipo de dados SQL

Tipo de dados no Scala

Notas

NUMBER

Os seguintes tipos são suportados:

  • Short ou Option[Short]

  • Int ou Option[Int]

  • Long ou Option[Long]

  • java.math.BigDecimal

FLOAT

Float ou Option[Float]

DOUBLE

Double ou Option[Double]

VARCHAR

String ou java.lang.String

BOOLEAN

Boolean ou Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] ou Array[Variant]

OBJECT

Map[String, String] ou Map[String, Variant]

São suportados mapas mutáveis dos seguintes tipos:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

GEOGRAPHY

com.snowflake.snowpark.types.Geography

Uma ressalva sobre criar UDFs em um objeto com a característica App

O Scala fornece uma característica App que você pode estender para transformar seu objeto Scala em um programa executável. A característica App fornece um método main que executa automaticamente todo o código no corpo da definição de seu objeto. (O código em sua definição de objeto torna-se efetivamente o método main).

Um efeito da extensão da característica App é que os campos em seu objeto não serão inicializados até que o método main seja chamado. Se seu objeto estende o App e você define uma UDF que usa um campo de objeto que você inicializou anteriormente, a definição da UDF carregada no servidor não incluirá o valor inicializado do campo do objeto.

Por exemplo, suponha que você defina e inicialize um campo chamado myConst no objeto e use esse campo em uma UDF:

object Main extends App {
  ...
  // Initialize a field.
  val myConst = "Prefix "
  // Use the field in a UDF.
  // Because the App trait delays the initialization of the object fields,
  // myConst in the UDF definition resolves to null.
  val myUdf = udf((s : String) =>  myConst + s )
  ...
}
Copy

Quando o Snowpark serializa e carrega a definição da UDF para Snowflake, myConst não é inicializado e é resolvido como null. Como resultado, chamar a UDF retorna null para myConst.

Para contornar isso, mude seu objeto para que ele não estenda a característica App e implemente um método separado main para seu código:

object Main {
  ...
  def main(args: Array[String]): Unit = {
    ... // Your code ...
  }
  ...
}
Copy

Especificação de dependências para uma UDF

Para definir uma UDF através da API do Snowpark, você deve chamar Session.addDependency() para quaisquer arquivos que contenham quaisquer classes e recursos dos quais sua UDF dependa (por exemplo, arquivos JAR, arquivos de recursos, etc.). (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos de uma UDF).

A biblioteca do Snowpark carrega esses arquivos para um estágio interno e adiciona os arquivos ao caminho da classe ao executar sua UDF.

Dica

Se você não quiser que a biblioteca carregue o arquivo toda vez que você executar seu aplicativo, carregue o arquivo para um estágio. Ao chamar addDependency, passe o caminho para o arquivo no estágio.

Se você estiver usando o REPL do Scala, você deve acrescentar o diretório de classes gerado pelo REPL como uma dependência. Por exemplo, se você usou o script run.sh para iniciar o REPL, chame o seguinte método, que adiciona o diretório repl_classes criado pelo script:

// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
Copy

O exemplo a seguir demonstra como adicionar um arquivo JAR em um estágio como uma dependência:

// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Copy

Os exemplos a seguir demonstram como adicionar dependências para arquivos JAR e arquivos de recursos:

// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")

// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")

// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Copy

Você não deve precisar especificar as seguintes dependências:

  • Suas bibliotecas de runtime do Scala.

    Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.

  • O arquivo JAR do Snowpark.

    A biblioteca do Snowpark tenta automaticamente detectar e carregar o arquivo JAR do Snowpark para o servidor.

    Para evitar que a biblioteca carregue repetidamente o arquivo JAR do Snowpark para o servidor:

    1. Carregue o arquivo JAR do Snowpark para um estágio.

      Por exemplo, o comando a seguir carrega o arquivo JAR do Snowpark para o estágio @mystage. O comando PUT compacta o arquivo JAR e nomeia o arquivo resultante como snowpark-1.9.0 .jar.gz.

      -- Put the Snowpark JAR file in a stage.
      PUT file:///<path>/snowpark-1.9.0.jar @mystage
    2. Chame addDependency para adicionar o arquivo JAR do Snowpark no estágio como uma dependência.

      Por exemplo, para adicionar o arquivo JAR do Snowpark carregado pelo comando anterior:

      // Add the Snowpark JAR file that you uploaded to a stage.
      session.addDependency("@mystage/snowpark-1.9.0.jar.gz")

      Observe que o caminho especificado para o arquivo JAR inclui a extensão .gz do nome do arquivo, que foi adicionada pelo comando PUT.

  • O arquivo JAR ou o diretório com o aplicativo que está sendo executado.

    A biblioteca do Snowpark tenta automaticamente detectar e carregar essas dependências.

    Se a biblioteca do Snowpark não conseguir detectar essas dependências automaticamente, a biblioteca reporta um erro, e você deve chamar addDependency para adicionar essas dependências manualmente.

Se levar tempo demais para que as dependências sejam carregadas no estágio, a biblioteca do Snowpark gerará uma exceção de tempo limite. Para configurar o tempo máximo que a biblioteca do Snowpark deve esperar, defina a propriedade snowpark_request_timeout_in_seconds ao criar a sessão.

Como criar uma UDF anônima

Para criar uma UDF anônima, você pode:

  • Chamar a função udf no objeto com.snowflake.snowpark.functions, passando a definição da função anônima.

  • Chamar o método registerTemporary na classe UDFRegistration passando a definição da função anônima. Como você está registrando uma UDF anônima, você deve usar as assinaturas do método que não tenham um parâmetro name.

Nota

Ao escrever código com multithread (por exemplo, ao usar coleções paralelas), use o método registerTemporary para registrar UDFs em vez de usar a função udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.

Esses métodos retornam um objeto UserDefinedFunction que você pode usar para chamar a UDF. (Consulte Como chamar funções definidas pelo usuário escalares (UDFs).)

O exemplo a seguir cria uma UDF anônima:

// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Copy

Nota

Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).

O exemplo a seguir cria uma UDF anônima que passa um Array de valores String e anexa a cadeia de caracteres x a cada valor:

// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
Copy

O exemplo a seguir cria uma UDF anônima que usa uma classe personalizada (LanguageDetector, que detecta o idioma usado no texto). O exemplo chama a UDF anônima para detectar o idioma na coluna text_data em um DataFrame e cria uma nova coluna DataFrame que inclui uma coluna adicional lang com o idioma utilizado.

// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._

// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector

// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")

// Create a detector
val detector = new LanguageDetector()

// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
     Option(detector.detect(s)).getOrElse("UNKNOWN"))

// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
    dfEmails.withColumn("lang", langUdf(col("text_data")))
Copy

Criação e registro de uma UDF nomeada

Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função callUDF no objeto functions) ou se precisar usar uma UDF em sessões posteriores, você pode criar e registrar uma UDF nomeada. Para isso, use um dos seguintes métodos na classe UDFRegistration:

  • registerTemporary se você apenas planeja usar a UDF na sessão atual

  • registerPermanent se você planeja usar a UDF em sessões posteriores

Para acessar um objeto da classe UDFRegistration, chame o método udf da classe Session.

registerTemporary cria uma UDF temporária que você pode usar na sessão atual.

// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

registerPermanent cria uma UDF que você pode usar nas sessões atuais e posteriores. Ao chamar registerPermanent, você também deve especificar um local em um local de estágio interno onde os arquivos JAR para a UDF e suas dependências serão carregados.

Nota

registerPermanent não oferece suporte para estágios externos.

Por exemplo:

// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Copy

Nota

Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).

Como criar UDFs em Jupyter Notebooks

Se você estiver criando UDFs em um Jupyter Notebook, você deve seguir estes passos adicionais:

Como escrever a implementação de uma UDF

Defina a implementação de sua função em uma classe que estenda Serializable. Por exemplo:

// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
  val myUserDefinedFunc = (s: String) => {
    ...
  }
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Copy

Acesso a uma variável definida em outra célula

Se você precisar usar uma variável definida em outra célula em sua UDF, você deve passar a variável como um argumento para o construtor da classe. Por exemplo, suponha que na célula 1, você tenha definido uma variável:

In [1]:
Copy
val prefix = "Hello"
Copy

e que você queira usar essa variável em uma UDF que você definiu na célula 2. No construtor de classes para a UDF, adicione um argumento para essa variável. Então, ao chamar o construtor da classe para criar a UDF, passe a variável definida na célula 1:

In [2]:
Copy
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
  val prependPrefixFunc = (s: String) => {
    s"$prefix $s"
  }
}

// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Copy

Como usar objetos que não são serializáveis

Quando você cria uma UDF para uma lambda ou função, a biblioteca do Snowpark serializa o fechamento da lambda e a envia para o servidor para execução.

Se um objeto capturado pelo fechamento da lambda não for serializável, a biblioteca do Snowpark gera uma exceção java.io.NotSerializableException.

Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Copy

Se isso ocorrer, você pode:

  • Tornar o objeto serializável ou

  • Declarar o objeto como lazy val ou usar a anotação @transient para evitar a serialização do objeto.

    Por exemplo:

    // Declare the detector object as lazy.
    lazy val detector = new LanguageDetector("en")
    // The detector object is not serialized but is instead reconstructed on the server.
    val langUdf = udf((s: String) =>
         Option(detector.detect(s)).getOrElse("UNKNOWN"))
    
    Copy

Como escrever um código de inicialização para uma UDF

Se sua UDF exigir um código ou contexto de inicialização, você pode fornecê-lo através de valores capturados como parte do fechamento da UDF.

O exemplo a seguir usa uma classe separada para inicializar o contexto necessário por três UDFs.

  • A primeira UDF cria uma nova instância da classe dentro da lambda, de forma que a inicialização é realizada toda vez que a UDF é invocada.

  • A segunda UDF captura uma instância da classe gerada em seu programa cliente. O contexto gerado no cliente é serializado e é utilizado pela UDF. Observe que a classe do contexto deve ser serializável para que essa abordagem funcione.

  • A terceira UDF captura um lazy val, portanto o contexto é instanciado lentamente na primeira invocação da UDF e é reutilizado em invocações posteriores. Essa abordagem funciona mesmo quando o contexto não é serializável. Entretanto, não há garantia de que TODAS as invocações de UDF dentro de um dataframe usarão o mesmo contexto gerado lentamente.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random

// Context needed for a UDF.
class Context {
  val randomInt = Random.nextInt
}

// Serializable context needed for the UDF.
class SerContext extends Serializable {
  val randomInt = Random.nextInt
}

object TestUdf {
  def main(args: Array[String]): Unit = {
    // Create the session.
    val session = Session.builder.configFile("/<path>/profile.properties").create
    import session.implicits._
    session.range(1, 10, 2).show()

    // Create a DataFrame with two columns ("c" and "d").
    val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
    dummy.show()

    // Initialize the context once per invocation.
    val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
    dummy.select(udfRepeatedInit('c)).show()

    // Initialize the serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    val sC = new SerContext
    val udfOnceInit = udf((i: Int) => sC.randomInt)
    dummy.select(udfOnceInit('c)).show()

    // Initialize the non-serializable context only once,
    // regardless of the number of times that the UDF is invoked.
    lazy val unserC = new Context
    val udfOnceInitU = udf((i: Int) => unserC.randomInt)
    dummy.select(udfOnceInitU('c)).show()
  }
}
Copy

Como ler arquivos de uma UDF

Como mencionado anteriormente, a biblioteca do Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.

Além disso, se o conteúdo do arquivo permanecer o mesmo entre chamadas para a UDF, você pode escrever seu código para carregar o arquivo uma vez durante a primeira chamada e não em chamadas posteriores. Isso pode melhorar o desempenho de suas chamadas à UDF.

Para configurar uma UDF para ler um arquivo:

  1. Adicione o arquivo a um arquivo JAR.

    Por exemplo, se sua UDF precisa usar um arquivo em um subdiretório data/ (data/hello.txt), execute o comando jar para adicionar esse arquivo a um arquivo JAR:

    # Create a new JAR file containing data/hello.txt.
    $ jar cvf <path>/myJar.jar data/hello.txt
    
    Copy
  2. Especifique que o arquivo JAR é uma dependência, que carrega o arquivo para o servidor e adiciona o arquivo ao caminho da classe. Consulte Especificação de dependências para uma UDF.

    Por exemplo:

    // Specify that myJar.jar contains files that your UDF depends on.
    session.addDependency("<path>/myJar.jar")
    
    Copy
  3. No arquivo da UDF, chame Class.getResourceAsStream para encontrar o arquivo no caminho da classe e ler o arquivo.

    Para evitar adicionar uma dependência em this, você pode usar classOf[com.snowflake.snowpark.DataFrame] (em vez de getClass) para obter o objeto Class.

    Por exemplo, para ler o arquivo data/hello.txt:

    // Read data/hello.txt from myJar.jar.
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
    
    Copy

    Neste exemplo, o nome do recurso começa com /, indicando que este é o caminho completo do arquivo no arquivo JAR. (Nesse caso, o local do arquivo não é relativo ao pacote da classe).

Nota

Se você não espera que o conteúdo do arquivo mude entre as chamadas da UDF, leia o arquivo em um lazy val. Isso faz com que o código de carregamento do arquivo seja executado apenas na primeira chamada para a UDF e não em chamadas posteriores.

O exemplo a seguir define um objeto (UDFCode) com uma função que será usada como UDF (readFileFunc). A função lê o arquivo data/hello.txt, que deve conter a cadeia de cadeia de caracteres hello,. A função precede essa cadeia de caracteres com a cadeia de caracteres passada como argumento.

// Create a function object that reads a file.
object UDFCode extends Serializable {

  // The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
  // the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
  // first accessed.
  lazy val prefix = {
    import java.io._
    val resourceName = "/data/hello.txt"
    val inputStream = classOf[com.snowflake.snowpark.DataFrame]
      .getResourceAsStream(resourceName)
    if (inputStream == null) {
      throw new Exception("Can't find file " + resourceName)
    }
    scala.io.Source.fromInputStream(inputStream).mkString
  }

  val readFileFunc = (s: String) => prefix + " : " + s
}
Copy

A próxima parte do exemplo registra a função como uma UDF anônima. O exemplo chama a UDF na coluna NAME em um DataFrame. O exemplo supõe que o arquivo data/hello.txt está empacotado no arquivo JAR myJar.jar.

// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")

// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")

// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)

// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Copy

Criação de funções de tabela definidas pelo usuário (UDTFs)

Para criar e registrar uma UDTF no Snowpark, você deve:

As próximas seções descrevem essas etapas com mais detalhes.

Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDTF.

Como definir a classe UDTF

Defina uma classe que herda de uma das classes UDTFn (por exemplo, UDTF0, UDTF1, etc.) no pacote com.snowflake.snowpark.udtf, onde n especifica o número de argumentos de entrada para sua UDTF. Por exemplo, se sua UDTF passa 2 argumentos de entrada, estenda a classe UDTF2.

Em sua classe, substitua os seguintes métodos:

  • outputSchema(), que retorna um objeto types.StructType que descreve os nomes e tipos dos campos nas linhas retornadas (o “esquema” da saída).

  • process(), que é chamado uma vez para cada linha na partição de entrada (consulte a nota abaixo).

  • endPartition(), que é chamado uma vez para cada partição após todas as linhas terem sido passadas para process().

Quando uma UDTF é chamada, as linhas são agrupadas em partições antes de serem passadas para a UDTF:

  • Se a instrução que chama a UDTF especifica a cláusula PARTITION (partições explícitas), essa cláusula determina como as linhas são particionadas.

  • Se a instrução não especificar a cláusula PARTITION (partições implícitas), o Snowflake determina a melhor maneira de particionar as linhas.

Para obter uma explicação das partições, consulte Funções de tabela e partições.

Para obter um exemplo de uma classe de UDTF, consulte Exemplo de uma classe de UDTF.

Substituição do método outputSchema()

Substitua o método outputSchema() para definir os nomes e tipos de dados dos campos (o “esquema de saída”) das linhas retornadas pelos métodos process() e endPartition().

def outputSchema(): StructType
Copy

Nesse método, construa e retorne um objeto StructType que usa uma Array de StructField objetos para especificar o tipo de dados do Snowflake de cada campo em uma linha retornada. O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de saída de uma UDTF:

Tipo de dados SQL

Tipo do Scala

Tipo de com.snowflake.snowpark.types

NUMBER

Short ou Option[Short]

ShortType

NUMBER

Int ou Option[Int]

IntType

NUMBER

Long ou Option[Long]

LongType

NUMBER

java.math.BigDecimal

DecimalType

FLOAT

Float ou Option[Float]

FloatType

DOUBLE

Double ou Option[Double]

DoubleType

VARCHAR

String ou java.lang.String

StringType

BOOLEAN

Boolean ou Option[Boolean]

BooleanType

DATE

java.sql.Date

DateType

TIMESTAMP

java.sql.Timestamp

TimestampType

BINARY

Array[Byte]

BinaryType

VARIANT

com.snowflake.snowpark.types.Variant

VariantType

ARRAY

Array[String]

ArrayType(StringType)

ARRAY

Array[Variant]

ArrayType(VariantType)

OBJECT

Map[String, String]

MapType(StringType, StringType)

OBJECT

Map[String, Variant]

MapType(StringType, VariantType)

Por exemplo, se sua UDTF retorna uma linha com um único campo de número inteiro:

override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Copy

Substituição do método process()

Em sua classe da UDTF, substitua o método process():

def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
Copy

onde n é o número de argumentos passados à sua UDTF.

O número de argumentos na assinatura corresponde à classe que você estendeu. Por exemplo, se sua UDTF passa 2 argumentos de entrada e você está estendendo a classe UDTF2, o método process() tem esta assinatura:

def process(arg0: A0, arg1: A1): Iterable[Row]
Copy

Esse método é invocado uma vez para cada linha na partição de entrada.

Como escolher os tipos de argumentos

Para o tipo de cada argumento no método process(), use o tipo do Scala que corresponde ao tipo de dados do Snowflake do argumento passado para a UDTF.

O Snowflake oferece suporte para os seguintes tipos de dados para os argumentos de uma UDTF:

Tipo de dados SQL

Tipo de dados no Scala

Notas

NUMBER

Os seguintes tipos são suportados:

  • Short ou Option[Short]

  • Int ou Option[Int]

  • Long ou Option[Long]

  • java.math.BigDecimal

FLOAT

Float ou Option[Float]

DOUBLE

Double ou Option[Double]

VARCHAR

String ou java.lang.String

BOOLEAN

Boolean ou Option[Boolean]

DATE

java.sql.Date

TIMESTAMP

java.sql.Timestamp

BINARY

Array[Byte]

VARIANT

com.snowflake.snowpark.types.Variant

ARRAY

Array[String] ou Array[Variant]

OBJECT

Map[String, String] ou Map[String, Variant]

São suportados mapas mutáveis dos seguintes tipos:

  • scala.collection.mutable.Map[String, String]

  • scala.collection.mutable.Map[String, Variant]

Como retornar linhas

No método process(), construa e retorne um Iterable de objetos Row que contenham os dados a serem retornados pela UDTF para os valores de entrada fornecidos. Os campos na linha devem usar os tipos que você especificou no método outputSchema. (Consulte Substituição do método outputSchema().)

Por exemplo, se sua UDTF gera linhas, construa e retorne um Iterable de objetos Row para as linhas geradas:

override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
Copy

Substituição do método endPartition()

Substitua o método endPartition e adicione o código que deve ser executado após todas as linhas na partição de entrada terem sido passadas para o método process. O método endPartition é invocado uma vez para cada partição de entrada.

def endPartition(): Iterable[Row]
Copy

Você pode usar esse método se precisar executar qualquer trabalho depois que todas as linhas na partição tiverem sido processadas. Por exemplo, você pode:

  • Linhas de retorno baseadas em informações de estado que você captura em cada chamada do método process.

  • Linhas de retorno que não estão ligadas a uma linha de entrada específica.

  • Linhas de retorno que resumem as linhas de saída que foram geradas pelo método process.

Os campos nas linhas que você retornar devem corresponder aos tipos que você especificou no método outputSchema. (Consulte Substituição do método outputSchema().)

Se você não precisar retornar linhas adicionais no final de cada partição, retorne um Iterable vazio de objetos Row. Por exemplo:

override def endPartition(): Iterable[Row] = Array.empty[Row]
Copy

Nota

Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando endPartition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.

Exemplo de uma classe de UDTF

A seguir, veja um exemplo de uma classe de UDTF que gera uma faixa de linhas.

  • Como a UDTF passa 2 argumentos, a classe estende UDTF2.

  • Os argumentos start e count especificam o número inicial para a linha e o número de linhas a gerar.

class MyRangeUdtf extends UDTF2[Int, Int] {
  override def process(start: Int, count: Int): Iterable[Row] =
    (start until (start + count)).map(Row(_))
  override def endPartition(): Iterable[Row] = Array.empty[Row]
  override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Copy

Como registrar a UDTF

Em seguida, crie uma instância da nova classe e registre a classe chamando um dos métodos UDTFRegistration. Você pode registrar uma UDTF temporária ou permanente.

Como registrar uma UDTF temporária

Para registrar uma UDTF temporária, chame UDTFRegistration.registerTemporary:

  • Se você não precisar chamar a UDTF pelo nome, você pode registrar uma UDTF anônima passando uma instância da classe:

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
    // Use the returned TableFunction object to call the UDTF.
    session.tableFunction(tableFunction, lit(10), lit(5)).show
    
    Copy
  • Se você precisar chamar a UDTF pelo nome, passe também um nome da UDTF:

    // Register the MyRangeUdtf class that was defined in the previous example.
    val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
    // Call the UDTF by name.
    session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
    
    Copy

Como registrar uma UDTF permanente

Se você precisar usar a UDTF em sessões posteriores, chame UDTFRegistration.registerPermanent para registrar uma UDTF permanente.

Ao registrar uma UDTF permanente, você deve especificar um estágio onde o método de registro carregará os arquivos JAR para a UDTF e suas dependências. Por exemplo:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage")
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Como chamar uma UDTF

Após registrar a UDTF, você pode chamar a UDTF passando o objeto retornado TableFunction para o método tableFunction do objeto Session:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf())
// Use the returned TableFunction object to call the UDTF.
session.tableFunction(tableFunction, lit(10), lit(5)).show()
Copy

Para chamar uma UDTF pelo nome, construa um objeto TableFunction com esse nome e passe-a para o método tableFunction:

// Register the MyRangeUdtf class that was defined in the previous example.
val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf())
// Call the UDTF by name.
session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Copy

Você também pode chamar uma UDTF diretamente através de uma instrução SELECT:

session.sql("select * from table(myUdtf(10, 5))")
Copy