Criação de funções definidas pelo usuário (UDFs) para DataFrames no Scala¶
A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma lambda ou função no Scala. Este tópico explica como criar esses tipos de funções.
Introdução¶
Você pode chamar APIs do Snowpark para criar funções definidas pelo usuário (UDFs) para suas lambdas e funções personalizadas no Scala, e você pode chamar essas UDFs para processar os dados em seu DataFrame.
Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark faz a serialização e o upload do código para sua UDF para um estágio interno. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão localizados. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.
Em seu código personalizado, você também pode chamar o código que está empacotado em arquivos JAR (por exemplo, classes de Java para uma biblioteca de terceiros).
Você pode criar uma UDF para seu código personalizado de duas maneiras:
Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.
Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.
As próximas seções fornecem informações importantes sobre a criação de UDFs no Snowpark:
Tipos de dados suportados para argumentos e valores de retorno
Uma ressalva sobre criar UDFs em um objeto com a característica App
O resto deste tópico explica como criar UDFs.
Nota
Se você tiver definido uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark.
Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário escalares (UDFs).
Tipos de dados suportados para argumentos e valores de retorno¶
Para criar uma UDF para uma função do Scala ou lambda, você deve usar os tipos de dados suportados listados abaixo para os argumentos e valor de retorno de sua função ou lambda:
Tipo de dados SQL |
Tipo de dados no Scala |
Notas |
|---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
São suportados mapas mutáveis dos seguintes tipos:
|
|
Uma ressalva sobre criar UDFs em um objeto com a característica App¶
O Scala fornece uma característica App que você pode estender para transformar seu objeto Scala em um programa executável. A característica App fornece um método main que executa automaticamente todo o código no corpo da definição de seu objeto. (O código em sua definição de objeto torna-se efetivamente o método main).
Um efeito da extensão da característica App é que os campos em seu objeto não serão inicializados até que o método main seja chamado. Se seu objeto estende o App e você define uma UDF que usa um campo de objeto que você inicializou anteriormente, a definição da UDF carregada no servidor não incluirá o valor inicializado do campo do objeto.
Por exemplo, suponha que você defina e inicialize um campo chamado myConst no objeto e use esse campo em uma UDF:
Quando o Snowpark serializa e carrega a definição da UDF para Snowflake, myConst não é inicializado e é resolvido como null. Como resultado, chamar a UDF retorna null para myConst.
Para contornar isso, mude seu objeto para que ele não estenda a característica App e implemente um método separado main para seu código:
Especificação de dependências para uma UDF¶
Para definir uma UDF através da API do Snowpark, você deve chamar Session.addDependency() para quaisquer arquivos que contenham quaisquer classes e recursos dos quais sua UDF dependa (por exemplo, arquivos JAR, arquivos de recursos, etc.). (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos de uma UDF).
A biblioteca do Snowpark carrega esses arquivos para um estágio interno e adiciona os arquivos ao caminho da classe ao executar sua UDF.
Dica
Se você não quiser que a biblioteca carregue o arquivo toda vez que você executar seu aplicativo, carregue o arquivo para um estágio. Ao chamar addDependency, passe o caminho para o arquivo no estágio.
Se você estiver usando o REPL do Scala, você deve acrescentar o diretório de classes gerado pelo REPL como uma dependência. Por exemplo, se você usou o script run.sh para iniciar o REPL, chame o seguinte método, que adiciona o diretório repl_classes criado pelo script:
O exemplo a seguir demonstra como adicionar um arquivo JAR em um estágio como uma dependência:
Os exemplos a seguir demonstram como adicionar dependências para arquivos JAR e arquivos de recursos:
Você não deve precisar especificar as seguintes dependências:
Suas bibliotecas de runtime do Scala.
Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.
O arquivo JAR do Snowpark.
A biblioteca do Snowpark tenta automaticamente detectar e carregar o arquivo JAR do Snowpark para o servidor.
Para evitar que a biblioteca carregue repetidamente o arquivo JAR do Snowpark para o servidor:
Carregue o arquivo JAR do Snowpark para um estágio.
For example, the following command uploads the Snowpark JAR file to the stage
@mystage. The PUT command compresses the JAR file and names the resulting file snowpark_2.12-1.18.0.jar.gz.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark_2.12-1.18.0.jar @mystageChame
addDependencypara adicionar o arquivo JAR do Snowpark no estágio como uma dependência.Por exemplo, para adicionar o arquivo JAR do Snowpark carregado pelo comando anterior:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark_2.12-1.18.0.jar.gz")Observe que o caminho especificado para o arquivo JAR inclui a extensão
.gzdo nome do arquivo, que foi adicionada pelo comando PUT.O arquivo JAR ou o diretório com o aplicativo que está sendo executado.
A biblioteca do Snowpark tenta automaticamente detectar e carregar essas dependências.
Se a biblioteca do Snowpark não conseguir detectar essas dependências automaticamente, a biblioteca reporta um erro, e você deve chamar
addDependencypara adicionar essas dependências manualmente.
Se levar tempo demais para que as dependências sejam carregadas no estágio, a biblioteca do Snowpark gerará uma exceção de tempo limite. Para configurar o tempo máximo que a biblioteca do Snowpark deve esperar, defina a propriedade snowpark_request_timeout_in_seconds ao criar a sessão.
Como criar uma UDF anônima¶
Para criar uma UDF anônima, você pode:
Chamar a função
udfno objetocom.snowflake.snowpark.functions, passando a definição da função anônima.Chamar o método
registerTemporaryna classeUDFRegistrationpassando a definição da função anônima. Como você está registrando uma UDF anônima, você deve usar as assinaturas do método que não tenham um parâmetroname.
Nota
Ao escrever código com multithread (por exemplo, ao usar coleções paralelas), use o método registerTemporary para registrar UDFs em vez de usar a função udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.
Esses métodos retornam um objeto UserDefinedFunction que você pode usar para chamar a UDF. (Consulte Como chamar funções definidas pelo usuário escalares (UDFs).)
O exemplo a seguir cria uma UDF anônima:
Nota
Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).
O exemplo a seguir cria uma UDF anônima que passa um Array de valores String e anexa a cadeia de caracteres x a cada valor:
O exemplo a seguir cria uma UDF anônima que usa uma classe personalizada (LanguageDetector, que detecta o idioma usado no texto). O exemplo chama a UDF anônima para detectar o idioma na coluna text_data em um DataFrame e cria uma nova coluna DataFrame que inclui uma coluna adicional lang com o idioma utilizado.
Criação e registro de uma UDF nomeada¶
Se você quiser chamar uma UDF pelo nome (por exemplo, usando a função callUDF no objeto functions) ou se precisar usar uma UDF em sessões posteriores, você pode criar e registrar uma UDF nomeada. Para isso, use um dos seguintes métodos na classe UDFRegistration:
registerTemporaryse você apenas planeja usar a UDF na sessão atualregisterPermanentse você planeja usar a UDF em sessões posteriores
Para acessar um objeto da classe UDFRegistration, chame o método udf da classe Session.
registerTemporary cria uma UDF temporária que você pode usar na sessão atual.
registerPermanent cria uma UDF que você pode usar nas sessões atuais e posteriores. Ao chamar registerPermanent, você também deve especificar um local em um local de estágio interno onde os arquivos JAR para a UDF e suas dependências serão carregados.
Nota
registerPermanent não oferece suporte para estágios externos.
Por exemplo:
Nota
Se você estiver criando uma UDF em um Jupyter Notebook, você deve configurar o notebook para trabalhar com o Snowpark (consulte Como configurar um Jupyter Notebook para o Snowpark Scala) e seguir as diretrizes para escrever UDFs em um notebook (ver Como criar UDFs em Jupyter Notebooks).
Como criar UDFs em Jupyter Notebooks¶
Se você estiver criando UDFs em um Jupyter Notebook, você deve seguir estes passos adicionais:
Como configurar um Jupyter Notebook para o Snowpark Scala (se você ainda não configurou o notebook para funcionar com o Snowpark)
Como escrever a implementação de uma UDF¶
Defina a implementação de sua função em uma classe que estenda Serializable. Por exemplo:
Acesso a uma variável definida em outra célula¶
Se você precisar usar uma variável definida em outra célula em sua UDF, você deve passar a variável como um argumento para o construtor da classe. Por exemplo, suponha que na célula 1, você tenha definido uma variável:
e que você queira usar essa variável em uma UDF que você definiu na célula 2. No construtor de classes para a UDF, adicione um argumento para essa variável. Então, ao chamar o construtor da classe para criar a UDF, passe a variável definida na célula 1:
Como usar objetos que não são serializáveis¶
Quando você cria uma UDF para uma lambda ou função, a biblioteca do Snowpark serializa o fechamento da lambda e a envia para o servidor para execução.
Se um objeto capturado pelo fechamento da lambda não for serializável, a biblioteca do Snowpark gera uma exceção java.io.NotSerializableException.
Se isso ocorrer, você pode:
Tornar o objeto serializável ou
Declarar o objeto como
lazy valou usar a anotação@transientpara evitar a serialização do objeto.Por exemplo:
Como escrever um código de inicialização para uma UDF¶
Se sua UDF exigir um código ou contexto de inicialização, você pode fornecê-lo através de valores capturados como parte do fechamento da UDF.
O exemplo a seguir usa uma classe separada para inicializar o contexto necessário por três UDFs.
A primeira UDF cria uma nova instância da classe dentro da lambda, de forma que a inicialização é realizada toda vez que a UDF é invocada.
A segunda UDF captura uma instância da classe gerada em seu programa cliente. O contexto gerado no cliente é serializado e é utilizado pela UDF. Observe que a classe do contexto deve ser serializável para que essa abordagem funcione.
A terceira UDF captura um
lazy val, portanto o contexto é instanciado lentamente na primeira invocação da UDF e é reutilizado em invocações posteriores. Essa abordagem funciona mesmo quando o contexto não é serializável. Entretanto, não há garantia de que TODAS as invocações de UDF dentro de um dataframe usarão o mesmo contexto gerado lentamente.
Como ler arquivos de uma UDF¶
Como mencionado anteriormente, a biblioteca do Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.
Além disso, se o conteúdo do arquivo permanecer o mesmo entre chamadas para a UDF, você pode escrever seu código para carregar o arquivo uma vez durante a primeira chamada e não em chamadas posteriores. Isso pode melhorar o desempenho de suas chamadas à UDF.
Para configurar uma UDF para ler um arquivo:
Adicione o arquivo a um arquivo JAR.
Por exemplo, se sua UDF precisa usar um arquivo em um subdiretório
data/(data/hello.txt), execute o comandojarpara adicionar esse arquivo a um arquivo JAR:Especifique que o arquivo JAR é uma dependência, que carrega o arquivo para o servidor e adiciona o arquivo ao caminho da classe. Consulte Especificação de dependências para uma UDF.
Por exemplo:
No arquivo da UDF, chame
Class.getResourceAsStreampara encontrar o arquivo no caminho da classe e ler o arquivo.Para evitar adicionar uma dependência em
this, você pode usarclassOf[com.snowflake.snowpark.DataFrame](em vez degetClass) para obter o objetoClass.Por exemplo, para ler o arquivo
data/hello.txt:Neste exemplo, o nome do recurso começa com
/, indicando que este é o caminho completo do arquivo no arquivo JAR. (Nesse caso, o local do arquivo não é relativo ao pacote da classe).
Nota
Se você não espera que o conteúdo do arquivo mude entre as chamadas da UDF, leia o arquivo em um
lazy val. Isso faz com que o código de carregamento do arquivo seja executado apenas na primeira chamada para a UDF e não em chamadas posteriores.
O exemplo a seguir define um objeto (UDFCode) com uma função que será usada como UDF (readFileFunc). A função lê o arquivo data/hello.txt, que deve conter a cadeia de cadeia de caracteres hello,. A função precede essa cadeia de caracteres com a cadeia de caracteres passada como argumento.
A próxima parte do exemplo registra a função como uma UDF anônima. O exemplo chama a UDF na coluna NAME em um DataFrame. O exemplo supõe que o arquivo data/hello.txt está empacotado no arquivo JAR myJar.jar.
Criação de funções de tabela definidas pelo usuário (UDTFs)¶
Para criar e registrar uma UDTF no Snowpark, você deve:
As próximas seções descrevem essas etapas com mais detalhes.
Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDTF.
Como definir a classe UDTF¶
Defina uma classe que herda de uma das classes UDTFn (por exemplo, UDTF0, UDTF1, etc.) no pacote com.snowflake.snowpark.udtf, onde n especifica o número de argumentos de entrada para sua UDTF. Por exemplo, se sua UDTF passa 2 argumentos de entrada, estenda a classe UDTF2.
Em sua classe, substitua os seguintes métodos:
outputSchema(), que retorna um objeto
types.StructTypeque descreve os nomes e tipos dos campos nas linhas retornadas (o “esquema” da saída).process(), que é chamado uma vez para cada linha na partição de entrada (consulte a nota abaixo).
endPartition(), que é chamado uma vez para cada partição após todas as linhas terem sido passadas para
process().
Quando uma UDTF é chamada, as linhas são agrupadas em partições antes de serem passadas para a UDTF:
Se a instrução que chama a UDTF especifica a cláusula PARTITION (partições explícitas), essa cláusula determina como as linhas são particionadas.
Se a instrução não especificar a cláusula PARTITION (partições implícitas), o Snowflake determina a melhor maneira de particionar as linhas.
Para obter uma explicação das partições, consulte Funções de tabela e partições.
Para obter um exemplo de uma classe de UDTF, consulte Exemplo de uma classe de UDTF.
Substituição do método outputSchema()¶
Substitua o método outputSchema() para definir os nomes e tipos de dados dos campos (o “esquema de saída”) das linhas retornadas pelos métodos process() e endPartition().
Nesse método, construa e retorne um objeto StructType que usa uma Array de StructField objetos para especificar o tipo de dados do Snowflake de cada campo em uma linha retornada. O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de saída de uma UDTF:
Tipo de dados SQL |
Tipo do Scala |
Tipo de |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Por exemplo, se sua UDTF retorna uma linha com um único campo de número inteiro:
Substituição do método process()¶
Em sua classe da UDTF, substitua o método process():
onde n é o número de argumentos passados à sua UDTF.
O número de argumentos na assinatura corresponde à classe que você estendeu. Por exemplo, se sua UDTF passa 2 argumentos de entrada e você está estendendo a classe UDTF2, o método process() tem esta assinatura:
Esse método é invocado uma vez para cada linha na partição de entrada.
Como escolher os tipos de argumentos¶
Para o tipo de cada argumento no método process(), use o tipo do Scala que corresponde ao tipo de dados do Snowflake do argumento passado para a UDTF.
O Snowflake oferece suporte para os seguintes tipos de dados para os argumentos de uma UDTF:
Tipo de dados SQL |
Tipo de dados no Scala |
Notas |
|---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
São suportados mapas mutáveis dos seguintes tipos:
|
Como retornar linhas¶
No método process(), construa e retorne um Iterable de objetos Row que contenham os dados a serem retornados pela UDTF para os valores de entrada fornecidos. Os campos na linha devem usar os tipos que você especificou no método outputSchema. (Consulte Substituição do método outputSchema().)
Por exemplo, se sua UDTF gera linhas, construa e retorne um Iterable de objetos Row para as linhas geradas:
Substituição do método endPartition()¶
Substitua o método endPartition e adicione o código que deve ser executado após todas as linhas na partição de entrada terem sido passadas para o método process. O método endPartition é invocado uma vez para cada partição de entrada.
Você pode usar esse método se precisar executar qualquer trabalho depois que todas as linhas na partição tiverem sido processadas. Por exemplo, você pode:
Linhas de retorno baseadas em informações de estado que você captura em cada chamada do método
process.Linhas de retorno que não estão ligadas a uma linha de entrada específica.
Linhas de retorno que resumem as linhas de saída que foram geradas pelo método
process.
Os campos nas linhas que você retornar devem corresponder aos tipos que você especificou no método outputSchema. (Consulte Substituição do método outputSchema().)
Se você não precisar retornar linhas adicionais no final de cada partição, retorne um Iterable vazio de objetos Row. Por exemplo:
Nota
Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando endPartition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.
Exemplo de uma classe de UDTF¶
A seguir, veja um exemplo de uma classe de UDTF que gera uma faixa de linhas.
Como a UDTF passa 2 argumentos, a classe estende
UDTF2.Os argumentos
startecountespecificam o número inicial para a linha e o número de linhas a gerar.
Como registrar a UDTF¶
Em seguida, crie uma instância da nova classe e registre a classe chamando um dos métodos UDTFRegistration. Você pode registrar uma UDTF temporária ou permanente.
Como registrar uma UDTF temporária¶
Para registrar uma UDTF temporária, chame UDTFRegistration.registerTemporary:
Se você não precisar chamar a UDTF pelo nome, você pode registrar uma UDTF anônima passando uma instância da classe:
Se você precisar chamar a UDTF pelo nome, passe também um nome da UDTF:
Como registrar uma UDTF permanente¶
Se você precisar usar a UDTF em sessões posteriores, chame UDTFRegistration.registerPermanent para registrar uma UDTF permanente.
Ao registrar uma UDTF permanente, você deve especificar um estágio onde o método de registro carregará os arquivos JAR para a UDTF e suas dependências. Por exemplo:
Como chamar uma UDTF¶
Após registrar a UDTF, você pode chamar a UDTF passando o objeto retornado TableFunction para o método tableFunction do objeto Session:
Para chamar uma UDTF pelo nome, construa um objeto TableFunction com esse nome e passe-a para o método tableFunction:
Você também pode chamar uma UDTF diretamente através de uma instrução SELECT: