Como escrever uma UDTF em Python¶
Neste tópico:
Você pode implementar um manipulador de função de tabela definida pelo usuário (UDTF) em Python. Esse código do manipulador é executado quando a UDTF é chamada. Este tópico descreve como implementar um manipulador em Python e criar a UDTF.
Uma UDTF é uma função definida pelo usuário (UDF) que retorna resultados tabulares. Para saber mais sobre manipuladores de UDF implementados em Python, consulte Como criar UDFs de Python. Para obter informações gerais sobre UDFs, consulte Visão geral das funções definidas pelo usuário.
No manipulador de um UDTF, você pode processar linhas de entrada (consulte Processamento de linhas neste tópico). Você também pode ter uma lógica que é executada para cada partição de entrada (consulte Processamento de partições neste tópico).
Ao criar uma UDTF em Python, faça o seguinte:
Implemente uma classe com métodos que o Snowflake invocará quando a UDTF for chamada.
Para obter mais detalhes, consulte Como implementar um manipulador neste tópico.
Crie a UDTF em SQL com o comando CREATE FUNCTION, especificando sua classe como o manipulador. Quando você cria a UDTF, você especifica:
Tipos de dados de parâmetros de entrada da UDTF.
Tipos de dados das colunas retornadas pela UDTF.
Código a ser executado como manipulador quando a UDTF é chamada.
A linguagem na qual o manipulador é implementado.
Para saber mais sobre a sintaxe, veja Como criar a UDTF com CREATE FUNCTION neste tópico.
Você pode chamar uma UDF ou UDTF como descrito em Como chamar uma UDF.
Nota
Funções de tabela (UDTFs) têm um limite de 500 argumentos de entrada e 500 colunas de saída.
O Snowflake atualmente oferece suporte para a escrita de UDTFs nas seguintes versões Python:
3.8
3.9
3,10
3,11
Em sua instrução CREATE FUNCTION, defina runtime_version
com a versão desejada.
Como implementar um manipulador¶
Você implementa uma classe de manipulador para processar valores de argumentos da UDTF em resultados tabulares e manipular uma entrada particionada. Para obter um exemplo de uma classe de manipulador, consulte Exemplo de classe do manipulador neste tópico.
Quando você cria a UDTF com CREATE FUNCTION, você especifica essa classe como o manipulador da UDTF. Para saber mais sobre o SQL para criar a função, consulte Como criar a UDTF com CREATE FUNCTION neste tópico.
Uma classe de manipulador implementa métodos que o Snowflake invocará quando a UDTF for chamada. Essa classe contém a lógica da UDTF.
Método |
Requisito |
Descrição |
---|---|---|
Método |
Opcional |
Inicializa o estado para o processamento com estado de partições de entrada. Para obter mais informações, consulte Como inicializar o manipulador neste tópico. |
Método |
Obrigatório |
Processa cada linha de entrada, retornando um valor tabular como tuplas. O Snowflake invoca este método passando a entrada dos argumentos da UDTF. Para obter mais informações, consulte Como definir um método process neste tópico. |
Método |
Opcional |
Finaliza o processamento de partições de entrada, retornando um valor tabular como tuplas. Para obter mais informações, consulte Como finalizar o processamento da partição neste tópico. |
Note que a geração de uma exceção de qualquer método na classe do manipulador faz com que o processamento pare. A consulta que chamou a UDTF falha com uma mensagem de erro.
Nota
Se seu código não atender aos requisitos descritos aqui, a criação ou execução da UDTF pode falhar. O Snowflake detectará violações quando a instrução CREATE FUNCTION for executada.
Exemplo de classe do manipulador¶
O código no exemplo a seguir cria uma UDTF cuja classe do manipulador processa linhas em uma partição. O método process
processa cada linha de entrada, devolvendo uma linha com o custo total para uma venda de estoque. Após processar linhas na partição, ele retorna (a partir de seu método end_partition
) o total para todas as vendas incluídas na partição.
create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol
, quantity
e price
da tabela stocks_table
. Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDF.
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Como inicializar o manipulador¶
Opcionalmente, você pode implementar um método __init__
em sua classe do manipulador que o Snowflake invocará antes que o manipulador comece a processar as linhas. Por exemplo, você pode usar esse método para estabelecer algum estado no escopo da partição para o manipulador. Seu método __init__
pode não produzir linhas de saída.
A assinatura do método deve ser da seguinte forma:
def __init__(self):
Por exemplo, talvez você queira:
Inicializar o estado para uma partição e depois usar esse estado nos métodos
process
eend_partition
.Executar uma inicialização de longo prazo que precisa ser feita apenas uma vez por partição e não uma vez por linha.
Nota
Você também pode executar a lógica uma vez antes de começar a manipulação da partição incluindo esse código fora da classe do manipulador, por exemplo antes da declaração da classe.
Para saber mais sobre o processamento de partições, consulte Processamento de partições neste tópico.
Se você usa um método __init__
, tenha em mente que __init__
:
Pode tomar apenas
self
como um argumento.Não pode produzir linhas de saída. Use sua implementação do método
process
para isso.É invocado uma vez para cada partição, e antes que o método
process
seja invocado.
Processamento de linhas¶
Implemente um método process
que o Snowflake invocará para cada linha de entrada.
Como definir um método process
¶
Defina um método process
que receba como valores os argumentos da UDTF convertidos de tipos do SQL, retornando dados que o Snowflake usará para criar o valor de retorno tabular da UDTF.
A assinatura do método deve ser da seguinte forma:
def process(self, *args):
Seu método process
deve:
Ter um parâmetro
self
.Declarar parâmetros do método correspondentes aos parâmetros da UDTF.
Os nomes de parâmetros do método não precisam corresponder aos nomes de parâmetros da UDTF, mas os parâmetros do método devem ser declarados na mesma ordem que os parâmetros da UDTF.
Ao passar valores de argumento da UDTF para seu método, o Snowflake converterá os valores dos tipos de SQL para os tipos de Python que você usa no método. Para obter mais informações sobre como o Snowflake mapeia entre tipos de dados de SQL e de Python, consulte Mapeamentos de tipos de dados SQL-Python.
Produza uma ou mais tuplas (ou retorne um iterável contendo tuplas) na qual a sequência de tuplas corresponde à sequência de colunas de valores de retorno da UDTF.
Os elementos de tupla devem aparecer na mesma ordem em que as colunas de valor de retorno da UDTF são declaradas. Para obter mais informações, consulte Como retornar um valor neste tópico.
O Snowflake converterá valores dos tipos de Python para tipos de SQL exigidos pela declaração da UDTF. Para obter mais informações sobre como o Snowflake mapeia entre tipos de dados de SQL e de Python, consulte Mapeamentos de tipos de dados SQL-Python.
Se um método na classe do manipulador gerar uma exceção, o processamento será interrompido. A consulta que chamou a UDTF falhará com uma mensagem de erro. Se o método process
retornar None
, o processamento será interrompido. (O método end_partition
ainda é invocado mesmo que o método process
retorne None
).
Exemplo do método process
O código no exemplo a seguir mostra uma StockSale
classe de manipulador com um método process
que processa três argumentos de UDTF (symbol
, quantity
e price
), retornando uma única linha com duas colunas (symbol
e total
). Observe que os parâmetros do método process
são declarados na mesma ordem que os parâmetros stock_sale
. Os argumentos na instrução process
do método yield
estão na mesma ordem que as colunas declaradas na cláusula stock_sale
RETURNS TABLE.
create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
def process(self, symbol, quantity, price):
cost = quantity * price
yield (symbol, cost)
$$;
O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol
, quantity
e price
da tabela stocks_table
.
select stock_sale.symbol, total
from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Como retornar um valor¶
Ao retornar linhas de saída, você pode usar yield
ou return
(mas não ambos) para retornar tuplas com o valor tabular. Se o método retornar ou produzir None
, o processamento para a linha atual será interrompido.
Ao usar
yield
, execute uma instruçãoyield
separada para cada linha de saída. Essa é a prática recomendada, pois a avaliação lenta que vem com oyield
permite um processamento mais eficiente e pode ajudar a evitar o esgotamento do tempo limite.Cada elemento na tupla torna-se um valor de coluna no resultado retornado pela UDTF. A ordem dos argumentos do
yield
deve corresponder à ordem das colunas declaradas para o valor de retorno na cláusula RETURNS TABLE de CREATE FUNCTION.O código no exemplo a seguir retorna valores que representam duas linhas.
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
Observe que como o argumento do yield é uma tupla, você deve incluir uma vírgula final ao passar um único valor na tupla, como no exemplo a seguir.
yield (cost,)
Ao usar
return
, retorne um iterável com tuplas.Cada valor em uma tupla torna-se um valor de coluna no resultado retornado pela UDTF. A ordem dos valores das colunas em uma tupla deve corresponder à ordem das colunas declaradas para o valor de retorno na cláusula RETURNS TABLE de CREATE FUNCTION.
O código no exemplo a seguir retorna duas linhas, cada uma com duas colunas: símbolo e total.
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
Como pular linhas¶
Para pular uma linha de entrada e processar a próxima linha (como quando você estiver validando as linhas de entrada), faça com que o método process
retorne um dos seguintes:
Ao usar
return
, retorneNone
, uma lista contendoNone
ou uma lista vazia para pular a linha.Ao usar
yield
, retorneNone
para pular uma linha.Note que se você tiver várias chamadas para
yield
, quaisquer chamadas após uma chamada que retornaNone
serão ignoradas pelo Snowflake.
O código no exemplo a seguir retorna apenas as linhas para as quais number
é um número inteiro positivo. Se number
não for positivo, o método retornará None
para pular a linha atual e continuar processando a próxima linha.
def process(self, number):
if number < 1:
yield None
else:
yield (number)
Processamento com e sem estado¶
Você pode implementar o manipulador para processar linhas levando em conta as partições ou para processá-las simplesmente linha por linha.
No processamento levando em conta as partições, o manipulador inclui o código para gerenciar o estado no escopo da partição. Isso inclui um método
__init__
que é executado no início do processamento da partição e um métodoend_partition
que o Snowflake invoca após processar a última linha da partição. Para obter mais informações, consulte Processamento de partições neste tópico.Em processamento sem levar em conta as partições, o manipulador é executado sem estado, ignorando os limites das partições.
Para que o manipulador seja executado dessa forma, não inclua um método
__init__
ouend_partition
.
Processamento de partições¶
Você pode processar as partições na entrada com um código que é executado por partição (por exemplo, para gerenciar o estado), bem como com um código que é executado para cada linha na partição.
Nota
Para obter mais informações sobre como especificar as partições ao chamar uma UDTF, consulte Funções de tabela e partições.
Quando uma consulta inclui partições, ela agrega linhas usando um valor especificado, tal como o valor de uma coluna. As linhas agregadas que seu manipulador recebe são particionadas por esse valor. Seu código pode processar essas partições e suas linhas de forma que o processamento para cada partição inclua o estado de escopo da partição.
O código no seguinte exemplo de SQL consulta informações de venda de estoque. Ele executa uma stock_sale_sum
UDTF cuja entrada é particionada pelo valor da coluna symbol
.
select stock_sale_sum.symbol, total
from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Tenha em mente que, mesmo quando as linhas de entrada são particionadas, seu código pode ignorar a separação da partição e apenas processar as linhas. Por exemplo, você pode omitir o código projetado para lidar com um estado no escopo da partição, como um método __init__
e end_partition
, e apenas implementar o método process
. Para obter mais informações, consulte Processamento com e sem estado neste tópico.
Para processar cada participação como uma unidade, você faria o seguinte:
Implementar um método de classe de manipulador
__init__
para inicializar o processamento para a partição.Para obter mais informações, consulte Como inicializar o manipulador neste tópico.
Incluir o código que leva em conta a partição ao processar cada linha com o método
process
.Para obter mais informações sobre linhas de processamento, consulte Processamento de linhas neste tópico.
Implementar um método
end_partition
para finalizar o processamento da partição.Para obter mais informações, consulte Como finalizar o processamento da partição neste tópico.
A seguir, veja uma descrição da sequência de invocações ao seu manipulador quando você tiver incluído código projetado para ser executado por partição.
Ao iniciar o processamento para uma partição, e antes que a primeira linha tenha sido processada, o Snowflake usa o método
__init__
de sua classe do manipulador para criar uma instância da classe.Aqui você pode estabelecer o estado no escopo da partição. Por exemplo, você pode inicializar uma variável de instância para manter um valor calculado a partir de linhas na partição.
Para cada linha da partição, o Snowflake invoca o método
process
.Cada vez que o método é executado, ele pode fazer mudanças nos valores de estado. Por exemplo, você pode fazer o método
process
atualizar o valor da variável da instância.Após seu código ter processado a última linha na partição, o Snowflake invoca seu método
end_partition
.A partir desse método, você pode retornar linhas de saída contendo um valor no nível de partição que você deseja retornar. Por exemplo, você pode retornar o valor da variável da instância que você vinha atualizando à medida que processava as linhas na partição.
Seu método
end_partition
não receberá nenhum argumento do Snowflake, que simplesmente o invoca depois de processar a última linha na partição.
Como finalizar o processamento da partição¶
Opcionalmente, você pode implementar um método end_partition
em sua classe do manipulador que o Snowflake invocará depois de ter processado todas as linhas em uma partição. Nesse método, você pode executar o código para uma partição depois que todas as linhas da partição tiverem sido processadas. Seu método end_partition
pode produzir linhas de saída, de modo a retornar os resultados de um cálculo no escopo da partição. Para obter mais informações, consulte Processamento de partições neste tópico.
A assinatura do método deve ser da seguinte forma:
def end_partition(self):
O Snowflake espera o seguinte de sua implementação do método end_partition
:
Ele não deve ser estático.
Ele não pode ter nenhum outro parâmetro além de
self
.Como alternativa ao retorno de um valor tabular, ele pode produzir uma lista vazia ou
None
.
Nota
Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando end_partition
leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.
Exemplo de tratamento de partições¶
O código no exemplo a seguir calcula o custo total pago em compras para um estoque calculando o custo por compra e somando as compras em conjunto (no método process
). O código retorna o total no método end_partition
.
Para obter um exemplo de uma UDTF que inclui este manipulador, juntamente com a chamada da UDTF, consulte Exemplo de classe do manipulador.
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
Ao processar as partições, tenha em mente o seguinte:
Seu código pode tratar de partições que não estejam explicitamente especificadas em uma chamada para a UDTF. Mesmo quando uma chamada para a UDTF não inclui uma cláusula PARTITION BY, o Snowflake particiona os dados implicitamente.
Seu método
process
receberá dados de linha na ordem especificada pela cláusula ORDER BY da partição, se houver.
Exemplos¶
Como usar um pacote importado¶
Você pode usar pacotes de Python que estão incluídos em uma lista curada de pacotes de terceiros do Anaconda disponíveis no Snowflake. Para especificar esses pacotes como dependências na UDTF, use a cláusula PACKAGES em CREATE FUNCTION.
Você pode descobrir a lista de pacotes incluídos executando o seguinte SQL no Snowflake:
select * from information_schema.packages where language = 'python';
Para obter mais informações, consulte Como usar pacotes de terceiros e Como criar UDFs de Python.
O código no exemplo a seguir usa uma função no pacote NumPy (Numerical Python) para calcular o preço médio por ação de uma array de compras de ações, cada uma com um preço diferente por ação.
create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np
class StockSaleAverage:
def __init__(self):
self._price_array = []
self._quantity_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
self._price_array.append(float(price))
cost = quantity * price
yield (symbol, cost)
def end_partition(self):
np_array = np.array(self._price_array)
avg = np.average(np_array)
yield (self._symbol, avg)
$$;
O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol
, quantity
e price
da tabela stocks_table
. Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDF.
select stock_sale_average.symbol, total
from stocks_table,
table(stock_sale_average(symbol, quantity, price)
over (partition by symbol));
Como executar tarefas simultâneas com processos de trabalho¶
Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.
Nota
A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.
Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.
Você pode fazer isso nos warehouses Snowflake usando a classe joblib
da biblioteca Parallel
, como no exemplo a seguir.
CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
RETURNS TABLE (result INT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'JoblibMultiprocessing'
PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt
class JoblibMultiprocessing:
def process(self, i):
pass
def end_partition(self):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
for r in result:
yield (r, )
$$;
Nota
O back-end padrão usado para joblib.Parallel
difere entre os warehouses padrão Snowflake e otimizados para Snowpark.
Padrão do warehouse:
threading
Padrão do warehouse otimizado para Snowpark:
loky
(multiprocessamento)
Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend
, como no exemplo a seguir.
import joblib
joblib.parallel_backend('loky')
Como criar a UDTF com CREATE FUNCTION
¶
Você cria uma UDTF em SQL usando o comando CREATE FUNCTION, especificando o código que você escreveu como manipulador. Para obter a referência do comando, consulte CREATE FUNCTION.
Use a sintaxe a seguir ao criar uma UDTF.
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
LANGUAGE PYTHON
[ IMPORTS = ( '<imports>' ) ]
RUNTIME_VERSION = 3.8
[ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
[ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
HANDLER = '<handler_class>'
[ AS '<python_code>' ]
Para associar o código do manipulador que você escreveu com a UDTF, faça o seguinte ao executar CREATE FUNCTION:
Em RETURNS TABLE, especifique as colunas de saída em pares de nomes de colunas e tipos.
Defina LANGUAGE como PYTHON.
Defina o valor da cláusula IMPORTS como o caminho e nome da classe do manipulador se a classe estiver em um local externo, como um estágio.
Para obter mais informações, consulte Como criar UDFs de Python.
Defina RUNTIME_VERSION como a versão do Python runtime que seu código requer. As versões suportadas do Python são:
3.8
3.9
3,10
3,11
Defina o valor da cláusula PACKAGES como o nome de um ou mais pacotes, se houver, exigidos pela classe do manipulador.
Para obter mais informações, consulte Como usar pacotes de terceiros e Como criar UDFs de Python.
Defina o valor da cláusula HANDLER como o nome da classe do manipulador.
Ao associar o código do manipulador em Python com uma UDTF, você pode incluir o código inline ou referir-se a ele em um local em um estágio do Snowflake. O valor HANDLER diferencia entre maiúsculas e minúsculas e deve corresponder ao nome da classe de Python.
Para obter mais informações, consulte UDFs com código inline vs. UDFs com código carregado de um estágio.
Importante
Para uma UDF escalar de Python, o valor da cláusula HANDLER contém o nome do método.
Para uma UDTF de Python, o valor da cláusula HANDLER contém o nome da classe mas não um nome do método.
A razão da diferença é que para uma UDFescalar de Python, o nome do método do manipulador é escolhido pelo usuário e, portanto, não é conhecido antecipadamente pelo Snowflake, mas para uma UDTF de Python, os nomes dos métodos (como o método
end_partition
) são conhecidos porque devem corresponder aos nomes especificados pelo Snowflake.A cláusula
AS '<python_code>'
é necessária se o código do manipulador for especificado inline com CREATE FUNCTION.