Criação de funções definidas pelo usuário (UDFs) para DataFrames em Java¶
A API do Snowpark fornece métodos que você pode usar para criar uma função definida pelo usuário a partir de uma expressão lambda em Java. Este tópico explica como criar esses tipos de funções.
Introdução¶
Você pode chamar APIs do Snowpark para criar funções definidas pelo usuário (UDFs) para expressões lambda em Java, e você pode chamar essas UDFs para processar os dados em seu DataFrame.
Quando você usa a API do Snowpark para criar uma UDF, a biblioteca do Snowpark faz a serialização e carrega o código da sua UDF para um estágio. Quando você chama a UDF, a biblioteca do Snowpark executa sua função no servidor, onde os dados estão localizados. Como resultado, os dados não precisam ser transferidos para o cliente para que a função possa processar os dados.
Em seu código personalizado, você também pode chamar o código que está empacotado em arquivos JAR (por exemplo, classes de Java para uma biblioteca de terceiros).
Você pode criar uma UDF para seu código personalizado de duas maneiras:
Você pode criar uma UDF anônima e atribuir a função a uma variável. Desde que essa variável esteja no escopo, você pode usar a variável para chamar a UDF.
Você pode criar uma UDF nomeada e chamar a UDF pelo nome. Você pode usar isso se, por exemplo, precisar chamar uma UDF pelo nome ou usar a UDF em uma sessão posterior.
O resto deste tópico explica como criar UDFs.
Nota
Se você tiver definido uma UDF executando o comando CREATE FUNCTION, você pode chamar essa UDF no Snowpark.
Para obter mais detalhes, consulte Como chamar funções definidas pelo usuário escalares (UDFs).
Tipos de dados suportados para argumentos e valores de retorno¶
Para criar uma UDF para uma lambda de Java, você deve usar os tipos de dados suportados listados abaixo para os argumentos e o valor de retorno de seu método:
Tipo de dados SQL |
Tipo de dados de Java |
Notas |
|---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
Especificação de dependências para uma UDF¶
Para definir uma UDF através da API do Snowpark, você deve chamar Session.addDependency() para quaisquer arquivos que contenham quaisquer classes e recursos dos quais sua UDF dependa (por exemplo, arquivos JAR, arquivos de recursos, etc.). (Para obter mais detalhes sobre recursos de leitura de uma UDF, consulte Como ler arquivos de uma UDF).
A biblioteca do Snowpark carrega esses arquivos para um estágio interno e adiciona os arquivos ao caminho da classe ao executar sua UDF.
Dica
Se você não quiser que a biblioteca carregue o arquivo toda vez que você executar seu aplicativo, carregue o arquivo para um estágio. Ao chamar addDependency, passe o caminho para o arquivo no estágio.
O exemplo a seguir demonstra como adicionar um arquivo JAR em um estágio como uma dependência:
Os exemplos a seguir demonstram como adicionar dependências para arquivos JAR e arquivos de recursos:
Você não deve precisar especificar as seguintes dependências:
Suas bibliotecas do Java runtime.
Essas bibliotecas já estão disponíveis no ambiente de runtime no servidor onde suas UDFs são executadas.
O arquivo JAR do Snowpark.
A biblioteca do Snowpark tenta automaticamente detectar e carregar o arquivo JAR do Snowpark para o servidor.
Para evitar que a biblioteca carregue repetidamente o arquivo JAR do Snowpark para o servidor:
Carregue o arquivo JAR do Snowpark para um estágio.
For example, the following command uploads the Snowpark JAR file to the stage
@mystage. The PUT command compresses the JAR file and names the resulting file snowpark_2.12-1.18.0.jar.gz.-- Put the Snowpark JAR file in a stage. PUT file:///<path>/snowpark_2.12-1.18.0.jar @mystageChame
addDependencypara adicionar o arquivo JAR do Snowpark no estágio como uma dependência.Por exemplo, para adicionar o arquivo JAR do Snowpark carregado pelo comando anterior:
// Add the Snowpark JAR file that you uploaded to a stage. session.addDependency("@mystage/snowpark_2.12-1.18.0.jar.gz");Observe que o caminho especificado para o arquivo JAR inclui a extensão
.gzdo nome do arquivo, que foi adicionada pelo comando PUT.O arquivo JAR ou o diretório com o aplicativo que está sendo executado.
A biblioteca do Snowpark tenta automaticamente detectar e carregar essas dependências.
Se a biblioteca do Snowpark não conseguir detectar essas dependências automaticamente, a biblioteca reporta um erro, e você deve chamar
addDependencypara adicionar essas dependências manualmente.
Se levar tempo demais para que as dependências sejam carregadas no estágio, a biblioteca do Snowpark gerará uma exceção de tempo limite. Para configurar o tempo máximo que a biblioteca do Snowpark deve esperar, defina a propriedade snowpark_request_timeout_in_seconds ao criar a sessão.
Como criar uma UDF anônima¶
Para criar uma UDF anônima, você pode:
Chamar o método estático
Functions.udfpassando a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.Chamar o método
registerTemporaryna classeUDFRegistrationpassando a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.Você pode acessar uma instância da classe
UDFRegistrationchamando o métodoudfdo objetoSession.Ao chamar
registerTemporary, use uma assinatura de método que não tenha um parâmetroname. (Como você está criando uma UDF anônima, você não especifica um nome para a UDF).
Nota
Ao escrever código com multithread (por exemplo, ao usar coleções paralelas), use o método registerTemporary para registrar UDFs em vez de usar o método udf. Isso pode evitar erros nos quais o objeto Session padrão do Snowflake não pode ser encontrado.
Esses métodos retornam um objeto UserDefinedFunction que você pode usar para chamar a UDF. (Consulte Como chamar funções definidas pelo usuário escalares (UDFs).)
O exemplo a seguir cria uma UDF anônima:
O exemplo a seguir cria uma UDF anônima que usa uma classe personalizada (LanguageDetector, que detecta o idioma usado no texto). O exemplo chama a UDF anônima para detectar o idioma na coluna text_data em um DataFrame e cria uma nova coluna DataFrame que inclui uma coluna adicional lang com o idioma utilizado.
Criação e registro de uma UDF nomeada¶
Se você quiser chamar uma UDF pelo nome (por exemplo, usando o método Functions.callUDF estático) ou se você precisar usar uma UDF em sessões posteriores, você pode criar e registrar uma UDF nomeada. Para isso, use um dos seguintes métodos na classe UDFRegistration:
registerTemporaryse você apenas planeja usar a UDF na sessão atualregisterPermanentse você planeja usar a UDF em sessões posteriores
Para acessar um objeto da classe UDFRegistration, chame o método udf do objeto Session.
Ao chamar o método registerTemporary ou registerPermanent, passe a expressão lambda e os campos DataTypes (ou objetos construídos pelos métodos dessa classe) representando os tipos de dados das entradas e da saída.
Por exemplo:
registerPermanent cria uma UDF que você pode usar nas sessões atuais e posteriores. Ao chamar registerPermanent, você também deve especificar um local em um local de estágio interno onde os arquivos JAR para a UDF e suas dependências serão carregados.
Nota
registerPermanent não oferece suporte para estágios externos.
Por exemplo:
Como usar objetos que não são serializáveis¶
Quando você cria uma UDF para uma expressão lambda, a biblioteca do Snowpark serializa o fechamento da lambda e o envia para o servidor para execução.
Se um objeto capturado pelo fechamento da lambda não for serializável, a biblioteca do Snowpark gera uma exceção java.io.NotSerializableException.
Se isso ocorrer, você deve tornar o objeto serizável.
Como escrever um código de inicialização para uma UDF¶
Se sua UDF exigir um código ou contexto de inicialização, você pode fornecê-lo através de valores capturados como parte do fechamento da UDF.
O exemplo a seguir usa uma classe separada para inicializar o contexto necessário para duas UDFs.
A primeira UDF cria uma nova instância da classe dentro da lambda, de forma que a inicialização é realizada toda vez que a UDF é invocada.
A segunda UDF captura uma instância da classe gerada em seu programa cliente. O contexto gerado no cliente é serializado e é utilizado pela UDF. Observe que a classe do contexto deve ser serializável para que essa abordagem funcione.
Como ler arquivos de uma UDF¶
Como mencionado anteriormente, a biblioteca do Snowpark carrega e executa UDFs no servidor. Se sua UDF precisa ler dados de um arquivo, você deve assegurar-se de que o arquivo seja carregado com a UDF.
Além disso, se o conteúdo do arquivo permanecer o mesmo entre chamadas para a UDF, você pode escrever seu código para carregar o arquivo uma vez durante a primeira chamada e não em chamadas posteriores. Isso pode melhorar o desempenho de suas chamadas à UDF.
Para configurar uma UDF para ler um arquivo:
Adicione o arquivo a um arquivo JAR.
Por exemplo, se sua UDF precisa usar um arquivo em um subdiretório
data/(data/hello.txt), execute o comandojarpara adicionar esse arquivo a um arquivo JAR:Especifique que o arquivo JAR é uma dependência, que carrega o arquivo para o servidor e adiciona o arquivo ao caminho da classe. Consulte Especificação de dependências para uma UDF.
Por exemplo:
No arquivo da UDF, chame
Class.forName().getResourceAsStream()para encontrar o arquivo no caminho da classe e ler o arquivo.Para evitar adicionar uma dependência em
this, você pode usarClass.forName("com.snowflake.snowpark_java.DataFrame")(em vez degetClass()) para obter o objetoClass.Por exemplo, para ler o arquivo
data/hello.txt:Neste exemplo, o nome do recurso começa com
/, indicando que este é o caminho completo do arquivo no arquivo JAR. (Nesse caso, o local do arquivo não é relativo ao pacote da classe).
Nota
Se você não espera que o conteúdo do arquivo mude entre as chamadas à UDF, leia o arquivo em um campo estático de sua classe e leia o arquivo somente se o campo não estiver definido.
O exemplo a seguir define um objeto (UDFCode) com uma função que será usada como UDF (readFileFunc). A função lê o arquivo data/hello.txt, que deve conter a cadeia de cadeia de caracteres hello,. A função precede essa cadeia de caracteres com a cadeia de caracteres passada como argumento.
A próxima parte do exemplo registra a função como uma UDF anônima. O exemplo chama a UDF na coluna NAME em um DataFrame. O exemplo supõe que o arquivo data/hello.txt está empacotado no arquivo JAR myJar.jar.
Criação de funções de tabela definidas pelo usuário (UDTFs)¶
Para criar e registrar uma UDTF no Snowpark, você deve:
As próximas seções descrevem essas etapas com mais detalhes.
Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDTF.
Como definir a classe UDTF¶
Defina uma classe que implemente uma das interfaces JavaUDTFn (por exemplo, JavaUDTF0, JavaUDTF1, etc.) no pacote com.snowflake.snowpark_java.udtf, onde n especifica o número de argumentos de entrada para sua UDTF. Por exemplo, se sua UDTF passar 2 argumentos de entrada, implemente a interface JavaUDTF2.
Em sua classe, implemente os seguintes métodos:
outputSchema(), que retorna um objeto
types.StructTypeque descreve os nomes e tipos dos campos nas linhas retornadas (o “esquema” da saída).process(), que é chamado uma vez para cada linha na partição de entrada (consulte a nota abaixo).
inputSchema(), que retorna um objeto
types.StructTypeque descreve os tipos dos parâmetros de entrada.Se seu método
process()passar argumentosMap, você deve implementar o métodoinputSchema(). Caso contrário, a implementação desse método é opcional.endPartition(), que é chamado uma vez para cada partição após todas as linhas terem sido passadas para
process().
Quando uma UDTF é chamada, as linhas são agrupadas em partições antes de serem passadas para a UDTF:
Se a instrução que chama a UDTF especifica a cláusula PARTITION (partições explícitas), essa cláusula determina como as linhas são particionadas.
Se a instrução não especificar a cláusula PARTITION (partições implícitas), o Snowflake determina a melhor maneira de particionar as linhas.
Para obter uma explicação das partições, consulte Funções de tabela e partições.
Para obter um exemplo de uma classe de UDTF, consulte Exemplo de uma classe de UDTF.
Como implementar o método outputSchema()¶
Implemente o método outputSchema() para definir os nomes e tipos de dados dos campos (o “esquema de saída”) das linhas retornadas pelos métodos process() e endPartition().
Nesse método, construa e retorne um objeto StructType que contenha objetos StructField representando o tipo de dados do Snowflake de cada campo em uma linha retornada. O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de saída de uma UDTF:
Tipo de dados SQL |
Tipo de Java |
Tipo de |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Por exemplo, se sua UDTF retorna uma linha com um único campo de número inteiro:
Como implementar o método process()¶
Em sua classe da UDTF, implemente o método process():
onde n é o número de argumentos passados à sua UDTF.
O número de argumentos na assinatura corresponde à interface que você implementou. Por exemplo, se sua UDTF passa 2 argumentos de entrada e você está implementando a interface JavaUDTF2, o método process() tem essa assinatura:
Esse método é invocado uma vez para cada linha na partição de entrada.
Como escolher os tipos de argumentos¶
Para o tipo de cada argumento no método process(), use o tipo de Java que corresponda ao tipo de dados do Snowflake do argumento passado para a UDTF.
O Snowflake oferece suporte para os seguintes tipos de dados para os argumentos de uma UDTF:
Tipo de dados SQL |
Tipo de dados de Java |
Notas |
|---|---|---|
Os seguintes tipos são suportados:
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
Nota
Se você passar argumentos java.util.Map, você deve implementar o método inputSchema para descrever os tipos desses argumentos. Consulte Como implementar o método inputSchema().
Como retornar linhas¶
No método process(), construa e retorne um java.util.stream.Stream de objetos Row contendo os dados a serem retornados pela UDTF para os valores de entrada dados. Os campos na linha devem usar os tipos que você especificou no método outputSchema. (Consulte Como implementar o método outputSchema().)
Por exemplo, se sua UDTF gera linhas, construa e retorne um Iterable de objetos Row para as linhas geradas:
Como implementar o método inputSchema()¶
Se o método process() passar um argumento java.util.Map, você deve implementar o método inputSchema() para descrever os tipos dos argumentos de entrada.
Nota
Se o método process() não passar argumentos Map, você não precisa implementar o método inputSchema().
Nesse método, construa e retorne um objeto StructType que contenha objetos StructField representando o tipo de dados do Snowflake de cada argumento passado para o método process(). O Snowflake oferece suporte para os seguintes tipos de objetos para o esquema de entrada de uma UDTF:
Tipo de dados SQL |
Tipo de Java |
Tipo de |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
||
|
|
|
|
|
|
|
|
|
|
|
Por exemplo, suponha que seu método process() passe um argumento Map<String, String> e um argumento Map<String, Variant>:
Você deve implementar o método inputSchema() para retornar um objeto StructType que descreva os tipos desses argumentos de entrada:
Como implemer o método endPartition()¶
Implemente o método endPartition e adicione o código que deve ser executado após todas as linhas na partição de entrada terem sido passadas para o método process. O método endPartition é invocado uma vez para cada partição de entrada.
Você pode usar esse método se precisar executar qualquer trabalho depois que todas as linhas na partição tiverem sido processadas. Por exemplo, você pode:
Linhas de retorno baseadas em informações de estado que você captura em cada chamada do método
process.Linhas de retorno que não estão ligadas a uma linha de entrada específica.
Linhas de retorno que resumem as linhas de saída que foram geradas pelo método
process.
Os campos nas linhas que você retornar devem corresponder aos tipos que você especificou no método outputSchema. (Consulte Como implementar o método outputSchema().)
Se você não precisar retornar linhas adicionais no final de cada partição, retorne um Stream vazio. Por exemplo:
Nota
Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando endPartition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.
Exemplo de uma classe de UDTF¶
A seguir, veja um exemplo de uma classe de UDTF que gera uma faixa de linhas.
Como a UDTF passa 2 argumentos, a classe implementa a
JavaUDTF2.Os argumentos
startecountespecificam o número inicial para a linha e o número de linhas a gerar.
Como registrar a UDTF¶
Em seguida, crie uma instância da nova classe e registre a classe chamando um dos métodos UDTFRegistration. Você pode registrar uma UDTF temporária ou permanente.
Como registrar uma UDTF temporária¶
Para registrar uma UDTF temporária, chame UDTFRegistration.registerTemporary:
Se você não precisar chamar a UDTF pelo nome, você pode registrar uma UDTF anônima passando uma instância da classe:
Se você precisar chamar a UDTF pelo nome, passe também um nome da UDTF:
Como registrar uma UDTF permanente¶
Se você precisar usar a UDTF em sessões posteriores, chame UDTFRegistration.registerPermanent para registrar uma UDTF permanente.
Ao registrar uma UDTF permanente, você deve especificar um estágio onde o método de registro carregará os arquivos JAR para a UDTF e suas dependências. Por exemplo:
Como chamar uma UDTF¶
Após registrar a UDTF, você pode chamar a UDTF passando o objeto retornado TableFunction para o método tableFunction do objeto Session:
Para chamar uma UDTF pelo nome, construa um objeto TableFunction com esse nome e passe-a para o método tableFunction:
Você também pode chamar uma UDTF diretamente através de uma instrução SELECT: