Criação de funções definidas pelo usuário (UDFs) para DataFrames no Scala¶
A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função no Scala. Este tópico explica como criar esses tipos de funções.
Neste tópico:
Introdução¶
Você pode chamar APIs do Snowpark para criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas no Scala, e você pode chamar essas UDFs para processar os dados em seu DataFrame.
Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark faz a serialização e o upload do código para sua UDF para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão localizados. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.
Em seu código personalizado, você também pode chamar o código que está empacotado em arquivos JAR (por exemplo, classes de Java para uma biblioteca de terceiros).
Você pode criar uma UDF para seu código personalizado de duas maneiras:
Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.
// Create and register an anonymous UDF (doubleUdf). val doubleUdf = udf((x: Int) => x + x) // Call the anonymous UDF. val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.
// Create and register a permanent named UDF ("doubleUdf"). session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage") // Call the named UDF. val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
As próximas seções fornecem informações importantes sobre a criação de UDFs no Snowpark:
Tipos de dados suportados para argumentos e valores de retorno
Uma ressalva sobre criar UDFs em um objeto com a característica App
O resto deste tópico explica como criar UDFs.
Nota
Se você tiver definido uma UDF executando o comando CREATE FUNCTION
, você pode chamar essa UDF no Snowpark.
Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário escalares (UDFs).
Tipos de dados suportados para argumentos e valores de retorno¶
Para criar uma UDF para uma função do Scala ou lambda, você deve usar os tipos de dados suportados listados abaixo para os argumentos e valor de retorno de sua função ou lambda:
Tipo de dados SQL |
Tipo de dados no Scala |
Notas |
---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
São suportados mapas mutáveis dos seguintes tipos:
|
|
Uma ressalva sobre criar UDFs em um objeto com a característica App¶
O Scala fornece uma característica App que você pode estender para transformar seu objeto Scala em um programa executável. A característica App
fornece um método main
que executa automaticamente todo o código no corpo da definição de seu objeto. (O código em sua definição de objeto torna-se efetivamente o método main
).
Um efeito da extensão da característica App
é que os campos em seu objeto não serão inicializados até que o método main
seja chamado. Se seu objeto estende o App
e você define uma UDF que usa um campo de objeto que você inicializou anteriormente, a definição da UDF carregada no servidor não incluirá o valor inicializado do campo do objeto.
Por exemplo, suponha que você defina e inicialize um campo chamado myConst
no objeto e use esse campo em uma UDF:
object Main extends App {
...
// Initialize a field.
val myConst = "Prefix "
// Use the field in a UDF.
// Because the App trait delays the initialization of the object fields,
// myConst in the UDF definition resolves to null.
val myUdf = udf((s : String) => myConst + s )
...
}
Quando o Snowpark serializa e carrega a definição da UDF para Snowflake, myConst
não é inicializado e é resolvido como null
. Como resultado, chamar a UDF retorna null
para myConst
.
Para contornar isso, mude seu objeto para que ele não estenda a característica App
e implemente um método separado main
para seu código:
object Main {
...
def main(args: Array[String]): Unit = {
... // Your code ...
}
...
}
Especificação de dependências para uma UDF¶
Para definir uma UDF através da API do Snowpark, você deve chamar Session.addDependency()
para quaisquer arquivos que contenham quaisquer classes e recursos dos quais sua UDF dependa (por exemplo, arquivos JAR, arquivos de recursos, etc.). (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos de uma UDF).
A biblioteca do Snowpark carrega esses arquivos para um estágio interno e adiciona os arquivos ao caminho da classe ao executar sua UDF.
Dica
Se você não quiser que a biblioteca carregue o arquivo toda vez que você executar seu aplicativo, carregue o arquivo para um estágio. Ao chamar addDependency
, passe o caminho para o arquivo no estágio.
Se você estiver usando o REPL do Scala, você deve acrescentar o diretório de classes gerado pelo REPL como uma dependência. Por exemplo, se você usou o script run.sh
para iniciar o REPL, chame o seguinte método, que adiciona o diretório repl_classes
criado pelo script:
// If you used the run.sh script to start the Scala REPL, call this to add the REPL classes directory as a dependency.
session.addDependency("<path_to_directory_where_you_ran_run.sh>/repl_classes/")
O exemplo a seguir demonstra como adicionar um arquivo JAR em um estágio como uma dependência:
// Add a JAR file that you uploaded to a stage.
session.addDependency("@my_stage/<path>/my-library.jar")
Os exemplos a seguir demonstram como adicionar dependências para arquivos JAR e arquivos de recursos:
// Add a JAR file on your local machine.
session.addDependency("/<path>/my-library.jar")
// Add a directory of resource files.
session.addDependency("/<path>/my-resource-dir/")
// Add a resource file.
session.addDependency("/<path>/my-resource.xml")
Você não deve precisar especificar as seguintes dependências:
Suas bibliotecas de runtime do Scala.
Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.
O arquivo JAR do Snowpark.
A biblioteca do Snowpark tenta automaticamente detectar e carregar o arquivo JAR do Snowpark para o servidor.
Para evitar que a biblioteca carregue repetidamente o arquivo JAR do Snowpark para o servidor:
Carregue o arquivo JAR do Snowpark para um estágio.
Por exemplo, o comando a seguir carrega o arquivo JAR do Snowpark para o estágio
@mystage
. O comando PUT compacta o arquivo JAR e nomeia o arquivo resultante como snowpark-1.10.0 .jar.gz.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark-1.10.0.jar @mystageChame
addDependency
para adicionar o arquivo JAR do Snowpark no estágio como uma dependência.Por exemplo, para adicionar o arquivo JAR do Snowpark carregado pelo comando anterior:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark-1.10.0.jar.gz")Observe que o caminho especificado para o arquivo JAR inclui a extensão
.gz
do nome do arquivo, que foi adicionada pelo comando PUT.O arquivo JAR ou o diretório com o aplicativo que está sendo executado.
A biblioteca do Snowpark tenta automaticamente detectar e carregar essas dependências.
Se a biblioteca do Snowpark não conseguir detectar essas dependências automaticamente, a biblioteca reporta um erro, e você deve chamar
addDependency
para adicionar essas dependências manualmente.
Se levar tempo demais para que as dependências sejam carregadas no estágio, a biblioteca do Snowpark gerará uma exceção de tempo limite. Para configurar o tempo máximo que a biblioteca do Snowpark deve esperar, defina a propriedade snowpark_request_timeout_in_seconds ao criar a sessão.
Como criar uma UDF anônima¶
Para criar uma UDF anônima, você pode:
Chamar a função
udf
no objetocom.snowflake.snowpark.functions
, passando a definição da função anônima.Chamar o método
registerTemporary
na classeUDFRegistration
passando a definição da função anônima. Como você está registrando uma UDF anônima, você deve usar as assinaturas do método que não tenham um parâmetroname
.
Nota
Ao escrever código com multithread (por exemplo, ao usar coleções paralelas), use o método registerTemporary
para registrar UDFs em vez de usar a função udf
. Isso pode evitar erros nos quais o objeto Session
padrão do Snowflake não pode ser encontrado.
Esses métodos retornam um objeto UserDefinedFunction
que você pode usar para chamar a UDF. (Consulte Como chamar funções definidas pelo usuário escalares (UDFs).)
O exemplo a seguir cria uma UDF anônima:
// Create and register an anonymous UDF.
val doubleUdf = udf((x: Int) => x + x)
// Call the anonymous UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", doubleUdf(col("num")))
Nota
Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).
O exemplo a seguir cria uma UDF anônima que passa um Array
de valores String
e anexa a cadeia de caracteres x
a cada valor:
// Create and register an anonymous UDF.
val appendUdf = udf((x: Array[String]) => x.map(a => a + " x"))
// Call the anonymous UDF, passing in the "a" column, which holds an ARRAY.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "appended".
val dfWithXAppended = df.withColumn("appended", appendUdf(col("a")))
O exemplo a seguir cria uma UDF anônima que usa uma classe personalizada (LanguageDetector
, que detecta o idioma usado no texto). O exemplo chama a UDF anônima para detectar o idioma na coluna text_data
em um DataFrame e cria uma nova coluna DataFrame que inclui uma coluna adicional lang
com o idioma utilizado.
// Import the udf function from the functions object.
import com.snowflake.snowpark.functions._
// Import the package for your custom code.
// The custom code in this example detects the language of textual data.
import com.mycompany.LanguageDetector
// If the custom code is packaged in a JAR file, add that JAR file as
// a dependency.
session.addDependency("$HOME/language-detector.jar")
// Create a detector
val detector = new LanguageDetector()
// Create an anonymous UDF that takes a string of text and returns the language used in that string.
// Note that this captures the detector object created above.
// Assign the UDF to the langUdf variable, which will be used to call the UDF.
val langUdf = udf((s: String) =>
Option(detector.detect(s)).getOrElse("UNKNOWN"))
// Create a new DataFrame that contains an additional "lang" column that contains the language
// detected by the UDF.
val dfEmailsWithLangCol =
dfEmails.withColumn("lang", langUdf(col("text_data")))
Criação e registro de uma UDF nomeada¶
Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função callUDF
no objeto functions
) ou se precisar usar uma UDF em sessões posteriores, você pode criar e registrar uma UDF nomeada. Para isso, use um dos seguintes métodos na classe UDFRegistration
:
registerTemporary
se você apenas planeja usar a UDF na sessão atualregisterPermanent
se você planeja usar a UDF em sessões posteriores
Para acessar um objeto da classe UDFRegistration
, chame o método udf
da classe Session
.
registerTemporary
cria uma UDF temporária que você pode usar na sessão atual.
// Create and register a temporary named UDF.
session.udf.registerTemporary("doubleUdf", (x: Int) => x + x)
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
registerPermanent
cria uma UDF que você pode usar nas sessões atuais e posteriores. Ao chamar registerPermanent
, você também deve especificar um local em um local de estágio interno onde os arquivos JAR para a UDF e suas dependências serão carregados.
Nota
registerPermanent
não oferece suporte para estágios externos.
Por exemplo:
// Create and register a permanent named UDF.
// Specify that the UDF and dependent JAR files should be uploaded to
// the internal stage named mystage.
session.udf.registerPermanent("doubleUdf", (x: Int) => x + x, "mystage")
// Call the named UDF, passing in the "num" column.
// The example uses withColumn to return a DataFrame containing
// the UDF result in a new column named "doubleNum".
val dfWithDoubleNum = df.withColumn("doubleNum", callUDF("doubleUdf", col("num")))
Nota
Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).
Como criar UDFs em Jupyter Notebooks¶
Se você estiver criando UDFs em um Jupyter Notebook, você deve seguir estes passos adicionais:
Como configurar um Jupyter Notebook para o Snowpark Scala (se você ainda não configurou o notebook para funcionar com o Snowpark)
Como escrever a implementação de uma UDF¶
Defina a implementação de sua função em uma classe que estenda Serializable
. Por exemplo:
// Class containing a function that implements your UDF.
class MyUDFCode( ... ) extends Serializable {
val myUserDefinedFunc = (s: String) => {
...
}
}
val myUdf = udf((new MyUDFCode(resourceName)).myUserDefinedFunc)
Acesso a uma variável definida em outra célula¶
Se você precisar usar uma variável definida em outra célula em sua UDF, você deve passar a variável como um argumento para o construtor da classe. Por exemplo, suponha que na célula 1, você tenha definido uma variável:
In [1]:
val prefix = "Hello"
e que você queira usar essa variável em uma UDF que você definiu na célula 2. No construtor de classes para a UDF, adicione um argumento para essa variável. Então, ao chamar o construtor da classe para criar a UDF, passe a variável definida na célula 1:
In [2]:
// resourceName is the argument for the variable defined in another cell.
class UDFCode(var prefix: String) extends Serializable {
val prependPrefixFunc = (s: String) => {
s"$prefix $s"
}
}
// When constructing UDFCode, pass in the variable (resourceName) that is defined in another cell.
val prependPrefixUdf = udf((new UDFCode(prefix)).prependPrefixFunc)
val myDf = session.sql("select 'Raymond' NAME")
myDf.withColumn("CONCAT", prependPrefixUdf(col("NAME"))).show()
Como usar objetos que não são serializáveis¶
Quando você cria uma UDF para uma lambda ou função, a biblioteca do Snowpark serializa o fechamento da lambda e a envia para o servidor para execução.
Se um objeto capturado pelo fechamento da lambda não for serializável, a biblioteca do Snowpark gera uma exceção java.io.NotSerializableException
.
Exception in thread "main" java.io.NotSerializableException: <YourObjectName>
Se isso ocorrer, você pode:
Tornar o objeto serializável ou
Declarar o objeto como
lazy val
ou usar a anotação@transient
para evitar a serialização do objeto.Por exemplo:
// Declare the detector object as lazy. lazy val detector = new LanguageDetector("en") // The detector object is not serialized but is instead reconstructed on the server. val langUdf = udf((s: String) => Option(detector.detect(s)).getOrElse("UNKNOWN"))
Como escrever um código de inicialização para uma UDF¶
Se sua UDF exigir um código ou contexto de inicialização, você pode fornecê-lo através de valores capturados como parte do fechamento da UDF.
O exemplo a seguir usa uma classe separada para inicializar o contexto necessário por três UDFs.
A primeira UDF cria uma nova instância da classe dentro da lambda, de forma que a inicialização é realizada toda vez que a UDF é invocada.
A segunda UDF captura uma instância da classe gerada em seu programa cliente. O contexto gerado no cliente é serializado e é utilizado pela UDF. Observe que a classe do contexto deve ser serializável para que essa abordagem funcione.
A terceira UDF captura um
lazy val
, portanto o contexto é instanciado lentamente na primeira invocação da UDF e é reutilizado em invocações posteriores. Essa abordagem funciona mesmo quando o contexto não é serializável. Entretanto, não há garantia de que TODAS as invocações de UDF dentro de um dataframe usarão o mesmo contexto gerado lentamente.
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import scala.util.Random
// Context needed for a UDF.
class Context {
val randomInt = Random.nextInt
}
// Serializable context needed for the UDF.
class SerContext extends Serializable {
val randomInt = Random.nextInt
}
object TestUdf {
def main(args: Array[String]): Unit = {
// Create the session.
val session = Session.builder.configFile("/<path>/profile.properties").create
import session.implicits._
session.range(1, 10, 2).show()
// Create a DataFrame with two columns ("c" and "d").
val dummy = session.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("c", "d")
dummy.show()
// Initialize the context once per invocation.
val udfRepeatedInit = udf((i: Int) => (new Context).randomInt)
dummy.select(udfRepeatedInit('c)).show()
// Initialize the serializable context only once,
// regardless of the number of times that the UDF is invoked.
val sC = new SerContext
val udfOnceInit = udf((i: Int) => sC.randomInt)
dummy.select(udfOnceInit('c)).show()
// Initialize the non-serializable context only once,
// regardless of the number of times that the UDF is invoked.
lazy val unserC = new Context
val udfOnceInitU = udf((i: Int) => unserC.randomInt)
dummy.select(udfOnceInitU('c)).show()
}
}
Como ler arquivos de uma UDF¶
Como mencionado anteriormente, a biblioteca do Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.
Além disso, se o conteúdo do arquivo permanecer o mesmo entre chamadas para a UDF, você pode escrever seu código para carregar o arquivo uma vez durante a primeira chamada e não em chamadas posteriores. Isso pode melhorar o desempenho de suas chamadas à UDF.
Para configurar uma UDF para ler um arquivo:
Adicione o arquivo a um arquivo JAR.
Por exemplo, se sua UDF precisa usar um arquivo em um subdiretório
data/
(data/hello.txt
), execute o comandojar
para adicionar esse arquivo a um arquivo JAR:# Create a new JAR file containing data/hello.txt. $ jar cvf <path>/myJar.jar data/hello.txt
Especifique que o arquivo JAR é uma dependência, que carrega o arquivo para o servidor e adiciona o arquivo ao caminho da classe. Consulte Especificação de dependências para uma UDF.
Por exemplo:
// Specify that myJar.jar contains files that your UDF depends on. session.addDependency("<path>/myJar.jar")
No arquivo da UDF, chame
Class.getResourceAsStream
para encontrar o arquivo no caminho da classe e ler o arquivo.Para evitar adicionar uma dependência em
this
, você pode usarclassOf[com.snowflake.snowpark.DataFrame]
(em vez degetClass
) para obter o objetoClass
.Por exemplo, para ler o arquivo
data/hello.txt
:// Read data/hello.txt from myJar.jar. val resourceName = "/data/hello.txt" val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName)
Neste exemplo, o nome do recurso começa com
/
, indicando que este é o caminho completo do arquivo no arquivo JAR. (Nesse caso, o local do arquivo não é relativo ao pacote da classe).
Nota
Se você não espera que o conteúdo do arquivo mude entre as chamadas da UDF, leia o arquivo em um
lazy val
. Isso faz com que o código de carregamento do arquivo seja executado apenas na primeira chamada para a UDF e não em chamadas posteriores.
O exemplo a seguir define um objeto (UDFCode
) com uma função que será usada como UDF (readFileFunc
). A função lê o arquivo data/hello.txt
, que deve conter a cadeia de cadeia de caracteres hello,
. A função precede essa cadeia de caracteres com a cadeia de caracteres passada como argumento.
// Create a function object that reads a file.
object UDFCode extends Serializable {
// The code in this block reads the file. To prevent this code from executing each time that the UDF is called,
// the code is used in the definition of a lazy val. The code for a lazy val is executed only once when the variable is
// first accessed.
lazy val prefix = {
import java.io._
val resourceName = "/data/hello.txt"
val inputStream = classOf[com.snowflake.snowpark.DataFrame]
.getResourceAsStream(resourceName)
if (inputStream == null) {
throw new Exception("Can't find file " + resourceName)
}
scala.io.Source.fromInputStream(inputStream).mkString
}
val readFileFunc = (s: String) => prefix + " : " + s
}
A próxima parte do exemplo registra a função como uma UDF anônima. O exemplo chama a UDF na coluna NAME
em um DataFrame. O exemplo supõe que o arquivo data/hello.txt
está empacotado no arquivo JAR myJar.jar
.
// Add the JAR file as a dependency.
session.addDependency("<path>/myJar.jar")
// Create a new DataFrame with one column (NAME)
// that contains the name "Raymond".
val myDf = session.sql("select 'Raymond' NAME")
// Register the function that you defined earlier as an anonymous UDF.
val readFileUdf = udf(UDFCode.readFileFunc)
// Call UDF for the values in the NAME column of the DataFrame.
myDf.withColumn("CONCAT", readFileUdf(col("NAME"))).show()
Criação de funções de tabela definidas pelo usuário (UDTFs)¶
Para criar e registrar uma UDTF no Snowpark, você deve:
As próximas seções descrevem essas etapas com mais detalhes.
Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDTF.
Como definir a classe UDTF¶
Defina uma classe que herda de uma das classes UDTFn
(por exemplo, UDTF0
, UDTF1
, etc.) no pacote com.snowflake.snowpark.udtf, onde n
especifica o número de argumentos de entrada para sua UDTF. Por exemplo, se sua UDTF passa 2 argumentos de entrada, estenda a classe UDTF2
.
Em sua classe, substitua os seguintes métodos:
outputSchema(), que retorna um objeto
types.StructType
que descreve os nomes e tipos dos campos nas linhas retornadas (o “esquema” da saída).process(), que é chamado uma vez para cada linha na partição de entrada (consulte a nota abaixo).
endPartition(), que é chamado uma vez para cada partição após todas as linhas terem sido passadas para
process()
.
Quando uma UDTF é chamada, as linhas são agrupadas em partições antes de serem passadas para a UDTF:
Se a instrução que chama a UDTF especifica a cláusula PARTITION (partições explícitas), essa cláusula determina como as linhas são particionadas.
Se a instrução não especificar a cláusula PARTITION (partições implícitas), o Snowflake determina a melhor maneira de particionar as linhas.
Para obter uma explicação das partições, consulte Funções de tabela e partições.
Para obter um exemplo de uma classe de UDTF, consulte Exemplo de uma classe de UDTF.
Substituição do método outputSchema()¶
Substitua o método outputSchema()
para definir os nomes e tipos de dados dos campos (o “esquema de saída”) das linhas retornadas pelos métodos process()
e endPartition()
.
def outputSchema(): StructType
Nesse método, construa e retorne um objeto StructType que usa uma Array
de StructField objetos para especificar o tipo de dados do Snowflake de cada campo em uma linha retornada. O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de saída de uma UDTF:
Tipo de dados SQL |
Tipo do Scala |
Tipo de |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Por exemplo, se sua UDTF retorna uma linha com um único campo de número inteiro:
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
Substituição do método process()¶
Em sua classe da UDTF, substitua o método process()
:
def process(arg0: A0, ... arg<n> A<n>): Iterable[Row]
onde n
é o número de argumentos passados à sua UDTF.
O número de argumentos na assinatura corresponde à classe que você estendeu. Por exemplo, se sua UDTF passa 2 argumentos de entrada e você está estendendo a classe UDTF2
, o método process()
tem esta assinatura:
def process(arg0: A0, arg1: A1): Iterable[Row]
Esse método é invocado uma vez para cada linha na partição de entrada.
Como escolher os tipos de argumentos¶
Para o tipo de cada argumento no método process()
, use o tipo do Scala que corresponde ao tipo de dados do Snowflake do argumento passado para a UDTF.
O Snowflake oferece suporte para os seguintes tipos de dados para os argumentos de uma UDTF:
Tipo de dados SQL |
Tipo de dados no Scala |
Notas |
---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
São suportados mapas mutáveis dos seguintes tipos:
|
Como retornar linhas¶
No método process()
, construa e retorne um Iterable
de objetos Row
que contenham os dados a serem retornados pela UDTF para os valores de entrada fornecidos. Os campos na linha devem usar os tipos que você especificou no método outputSchema
. (Consulte Substituição do método outputSchema().)
Por exemplo, se sua UDTF gera linhas, construa e retorne um Iterable
de objetos Row
para as linhas geradas:
override def process(start: Int, count: Int): Iterable[Row] = (start until (start + count)).map(Row(_))
Substituição do método endPartition()¶
Substitua o método endPartition
e adicione o código que deve ser executado após todas as linhas na partição de entrada terem sido passadas para o método process
. O método endPartition
é invocado uma vez para cada partição de entrada.
def endPartition(): Iterable[Row]
Você pode usar esse método se precisar executar qualquer trabalho depois que todas as linhas na partição tiverem sido processadas. Por exemplo, você pode:
Linhas de retorno baseadas em informações de estado que você captura em cada chamada do método
process
.Linhas de retorno que não estão ligadas a uma linha de entrada específica.
Linhas de retorno que resumem as linhas de saída que foram geradas pelo método
process
.
Os campos nas linhas que você retornar devem corresponder aos tipos que você especificou no método outputSchema
. (Consulte Substituição do método outputSchema().)
Se você não precisar retornar linhas adicionais no final de cada partição, retorne um Iterable
vazio de objetos Row
. Por exemplo:
override def endPartition(): Iterable[Row] = Array.empty[Row]
Nota
Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando endPartition
leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.
Exemplo de uma classe de UDTF¶
A seguir, veja um exemplo de uma classe de UDTF que gera uma faixa de linhas.
Como a UDTF passa 2 argumentos, a classe estende
UDTF2
.Os argumentos
start
ecount
especificam o número inicial para a linha e o número de linhas a gerar.
class MyRangeUdtf extends UDTF2[Int, Int] {
override def process(start: Int, count: Int): Iterable[Row] =
(start until (start + count)).map(Row(_))
override def endPartition(): Iterable[Row] = Array.empty[Row]
override def outputSchema(): StructType = StructType(StructField("C1", IntegerType))
}
Como registrar a UDTF¶
Em seguida, crie uma instância da nova classe e registre a classe chamando um dos métodos UDTFRegistration. Você pode registrar uma UDTF temporária ou permanente.
Como registrar uma UDTF temporária¶
Para registrar uma UDTF temporária, chame UDTFRegistration.registerTemporary
:
Se você não precisar chamar a UDTF pelo nome, você pode registrar uma UDTF anônima passando uma instância da classe:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show
Se você precisar chamar a UDTF pelo nome, passe também um nome da UDTF:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Como registrar uma UDTF permanente¶
Se você precisar usar a UDTF em sessões posteriores, chame UDTFRegistration.registerPermanent
para registrar uma UDTF permanente.
Ao registrar uma UDTF permanente, você deve especificar um estágio onde o método de registro carregará os arquivos JAR para a UDTF e suas dependências. Por exemplo:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerPermanent("myUdtf", new MyRangeUdtf(), "@mystage") // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Como chamar uma UDTF¶
Após registrar a UDTF, você pode chamar a UDTF passando o objeto retornado TableFunction
para o método tableFunction
do objeto Session
:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary(new MyRangeUdtf()) // Use the returned TableFunction object to call the UDTF. session.tableFunction(tableFunction, lit(10), lit(5)).show()
Para chamar uma UDTF pelo nome, construa um objeto TableFunction
com esse nome e passe-a para o método tableFunction
:
// Register the MyRangeUdtf class that was defined in the previous example. val tableFunction = session.udtf.registerTemporary("myUdtf", new MyRangeUdtf()) // Call the UDTF by name. session.tableFunction(TableFunction("myUdtf"), lit(10), lit(5)).show()
Você também pode chamar uma UDTF diretamente através de uma instrução SELECT:
session.sql("select * from table(myUdtf(10, 5))")