Como escrever uma UDTF em Python

Neste tópico:

Você pode implementar um manipulador de função de tabela definida pelo usuário (UDTF) em Python. Esse código do manipulador é executado quando a UDTF é chamada. Este tópico descreve como implementar um manipulador em Python e criar a UDTF.

Uma UDTF é uma função definida pelo usuário (UDF) que retorna resultados tabulares. Para saber mais sobre manipuladores de UDF implementados em Python, consulte Como criar UDFs de Python. Para obter informações gerais sobre UDFs, consulte Visão geral das funções definidas pelo usuário.

No manipulador de um UDTF, você pode processar linhas de entrada (consulte Processamento de linhas neste tópico). Você também pode ter uma lógica que é executada para cada partição de entrada (consulte Processamento de partições neste tópico).

Ao criar uma UDTF em Python, faça o seguinte:

  1. Implemente uma classe com métodos que o Snowflake invocará quando a UDTF for chamada.

    Para obter mais detalhes, consulte Como implementar um manipulador neste tópico.

  2. Crie a UDTF em SQL com o comando CREATE FUNCTION, especificando sua classe como o manipulador. Quando você cria a UDTF, você especifica:

    • Tipos de dados de parâmetros de entrada da UDTF.

    • Tipos de dados das colunas retornadas pela UDTF.

    • Código a ser executado como manipulador quando a UDTF é chamada.

    • A linguagem na qual o manipulador é implementado.

    Para saber mais sobre a sintaxe, veja Como criar a UDTF com CREATE FUNCTION neste tópico.

Você pode chamar uma UDF ou UDTF como descrito em Como chamar uma UDF.

Nota

Funções de tabela (UDTFs) têm um limite de 500 argumentos de entrada e 500 colunas de saída.

O Snowflake atualmente oferece suporte para a escrita de UDTFs nas seguintes versões Python:

  • 3.8

  • 3.9

  • 3,10

  • 3,11

Em sua instrução CREATE FUNCTION, defina runtime_version com a versão desejada.

Como implementar um manipulador

Você implementa uma classe de manipulador para processar valores de argumentos da UDTF em resultados tabulares e manipular uma entrada particionada. Para obter um exemplo de uma classe de manipulador, consulte Exemplo de classe do manipulador neste tópico.

Quando você cria a UDTF com CREATE FUNCTION, você especifica essa classe como o manipulador da UDTF. Para saber mais sobre o SQL para criar a função, consulte Como criar a UDTF com CREATE FUNCTION neste tópico.

Uma classe de manipulador implementa métodos que o Snowflake invocará quando a UDTF for chamada. Essa classe contém a lógica da UDTF.

Método

Requisito

Descrição

Método __init__

Opcional

Inicializa o estado para o processamento com estado de partições de entrada. Para obter mais informações, consulte Como inicializar o manipulador neste tópico.

Método process

Obrigatório

Processa cada linha de entrada, retornando um valor tabular como tuplas. O Snowflake invoca este método passando a entrada dos argumentos da UDTF. Para obter mais informações, consulte Como definir um método process neste tópico.

Método end_partition

Opcional

Finaliza o processamento de partições de entrada, retornando um valor tabular como tuplas. Para obter mais informações, consulte Como finalizar o processamento da partição neste tópico.

Note que a geração de uma exceção de qualquer método na classe do manipulador faz com que o processamento pare. A consulta que chamou a UDTF falha com uma mensagem de erro.

Nota

Se seu código não atender aos requisitos descritos aqui, a criação ou execução da UDTF pode falhar. O Snowflake detectará violações quando a instrução CREATE FUNCTION for executada.

Exemplo de classe do manipulador

O código no exemplo a seguir cria uma UDTF cuja classe do manipulador processa linhas em uma partição. O método process processa cada linha de entrada, devolvendo uma linha com o custo total para uma venda de estoque. Após processar linhas na partição, ele retorna (a partir de seu método end_partition) o total para todas as vendas incluídas na partição.

create or replace function stock_sale_sum(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSaleSum'
as $$
class StockSaleSum:
    def __init__(self):
        self._cost_total = 0
        self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      cost = quantity * price
      self._cost_total += cost
      yield (symbol, cost)

    def end_partition(self):
      yield (self._symbol, self._cost_total)
$$;
Copy

O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol, quantity e price da tabela stocks_table. Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDF.

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

Como inicializar o manipulador

Opcionalmente, você pode implementar um método __init__ em sua classe do manipulador que o Snowflake invocará antes que o manipulador comece a processar as linhas. Por exemplo, você pode usar esse método para estabelecer algum estado no escopo da partição para o manipulador. Seu método __init__ pode não produzir linhas de saída.

A assinatura do método deve ser da seguinte forma:

def __init__(self):
Copy

Por exemplo, talvez você queira:

  • Inicializar o estado para uma partição e depois usar esse estado nos métodos process e end_partition.

  • Executar uma inicialização de longo prazo que precisa ser feita apenas uma vez por partição e não uma vez por linha.

Nota

Você também pode executar a lógica uma vez antes de começar a manipulação da partição incluindo esse código fora da classe do manipulador, por exemplo antes da declaração da classe.

Para saber mais sobre o processamento de partições, consulte Processamento de partições neste tópico.

Se você usa um método __init__, tenha em mente que __init__:

  • Pode tomar apenas self como um argumento.

  • Não pode produzir linhas de saída. Use sua implementação do método process para isso.

  • É invocado uma vez para cada partição, e antes que o método process seja invocado.

Processamento de linhas

Implemente um método process que o Snowflake invocará para cada linha de entrada.

Como definir um método process

Defina um método process que receba como valores os argumentos da UDTF convertidos de tipos do SQL, retornando dados que o Snowflake usará para criar o valor de retorno tabular da UDTF.

A assinatura do método deve ser da seguinte forma:

def process(self, *args):
Copy

Seu método process deve:

  • Ter um parâmetro self.

  • Declarar parâmetros do método correspondentes aos parâmetros da UDTF.

    Os nomes de parâmetros do método não precisam corresponder aos nomes de parâmetros da UDTF, mas os parâmetros do método devem ser declarados na mesma ordem que os parâmetros da UDTF.

    Ao passar valores de argumento da UDTF para seu método, o Snowflake converterá os valores dos tipos de SQL para os tipos de Python que você usa no método. Para obter mais informações sobre como o Snowflake mapeia entre tipos de dados de SQL e de Python, consulte Mapeamentos de tipos de dados SQL-Python.

  • Produza uma ou mais tuplas (ou retorne um iterável contendo tuplas) na qual a sequência de tuplas corresponde à sequência de colunas de valores de retorno da UDTF.

    Os elementos de tupla devem aparecer na mesma ordem em que as colunas de valor de retorno da UDTF são declaradas. Para obter mais informações, consulte Como retornar um valor neste tópico.

    O Snowflake converterá valores dos tipos de Python para tipos de SQL exigidos pela declaração da UDTF. Para obter mais informações sobre como o Snowflake mapeia entre tipos de dados de SQL e de Python, consulte Mapeamentos de tipos de dados SQL-Python.

Se um método na classe do manipulador gerar uma exceção, o processamento será interrompido. A consulta que chamou a UDTF falhará com uma mensagem de erro. Se o método process retornar None, o processamento será interrompido. (O método end_partition ainda é invocado mesmo que o método process retorne None).

Exemplo do método process

O código no exemplo a seguir mostra uma StockSale classe de manipulador com um método process que processa três argumentos de UDTF (symbol, quantity e price), retornando uma única linha com duas colunas (symbol e total). Observe que os parâmetros do método process são declarados na mesma ordem que os parâmetros stock_sale. Os argumentos na instrução process do método yield estão na mesma ordem que as colunas declaradas na cláusula stock_sale RETURNS TABLE.

create or replace function stock_sale(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
handler='StockSale'
as $$
class StockSale:
    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
$$;
Copy

O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol, quantity e price da tabela stocks_table.

select stock_sale.symbol, total
  from stocks_table, table(stock_sale(symbol, quantity, price) over (partition by symbol));
Copy

Como retornar um valor

Ao retornar linhas de saída, você pode usar yield ou return (mas não ambos) para retornar tuplas com o valor tabular. Se o método retornar ou produzir None, o processamento para a linha atual será interrompido.

  • Ao usar yield, execute uma instrução yield separada para cada linha de saída. Essa é a prática recomendada, pois a avaliação lenta que vem com o yield permite um processamento mais eficiente e pode ajudar a evitar o esgotamento do tempo limite.

    Cada elemento na tupla torna-se um valor de coluna no resultado retornado pela UDTF. A ordem dos argumentos do yield deve corresponder à ordem das colunas declaradas para o valor de retorno na cláusula RETURNS TABLE de CREATE FUNCTION.

    O código no exemplo a seguir retorna valores que representam duas linhas.

    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
      yield (symbol, cost)
    
    Copy

    Observe que como o argumento do yield é uma tupla, você deve incluir uma vírgula final ao passar um único valor na tupla, como no exemplo a seguir.

    yield (cost,)
    
    Copy
  • Ao usar return, retorne um iterável com tuplas.

    Cada valor em uma tupla torna-se um valor de coluna no resultado retornado pela UDTF. A ordem dos valores das colunas em uma tupla deve corresponder à ordem das colunas declaradas para o valor de retorno na cláusula RETURNS TABLE de CREATE FUNCTION.

    O código no exemplo a seguir retorna duas linhas, cada uma com duas colunas: símbolo e total.

    def process(self, symbol, quantity, price):
      cost = quantity * price
      return [(symbol, cost), (symbol, cost)]
    
    Copy

Como pular linhas

Para pular uma linha de entrada e processar a próxima linha (como quando você estiver validando as linhas de entrada), faça com que o método process retorne um dos seguintes:

  • Ao usar return, retorne None, uma lista contendo None ou uma lista vazia para pular a linha.

  • Ao usar yield, retorne None para pular uma linha.

    Note que se você tiver várias chamadas para yield, quaisquer chamadas após uma chamada que retorna None serão ignoradas pelo Snowflake.

O código no exemplo a seguir retorna apenas as linhas para as quais number é um número inteiro positivo. Se number não for positivo, o método retornará None para pular a linha atual e continuar processando a próxima linha.

def process(self, number):
  if number < 1:
    yield None
  else:
    yield (number)
Copy

Processamento com e sem estado

Você pode implementar o manipulador para processar linhas levando em conta as partições ou para processá-las simplesmente linha por linha.

  • No processamento levando em conta as partições, o manipulador inclui o código para gerenciar o estado no escopo da partição. Isso inclui um método __init__ que é executado no início do processamento da partição e um método end_partition que o Snowflake invoca após processar a última linha da partição. Para obter mais informações, consulte Processamento de partições neste tópico.

  • Em processamento sem levar em conta as partições, o manipulador é executado sem estado, ignorando os limites das partições.

    Para que o manipulador seja executado dessa forma, não inclua um método __init__ ou end_partition.

Processamento de partições

Você pode processar as partições na entrada com um código que é executado por partição (por exemplo, para gerenciar o estado), bem como com um código que é executado para cada linha na partição.

Nota

Para obter mais informações sobre como especificar as partições ao chamar uma UDTF, consulte Funções de tabela e partições.

Quando uma consulta inclui partições, ela agrega linhas usando um valor especificado, tal como o valor de uma coluna. As linhas agregadas que seu manipulador recebe são particionadas por esse valor. Seu código pode processar essas partições e suas linhas de forma que o processamento para cada partição inclua o estado de escopo da partição.

O código no seguinte exemplo de SQL consulta informações de venda de estoque. Ele executa uma stock_sale_sum UDTF cuja entrada é particionada pelo valor da coluna symbol.

select stock_sale_sum.symbol, total
  from stocks_table, table(stock_sale_sum(symbol, quantity, price) over (partition by symbol));
Copy

Tenha em mente que, mesmo quando as linhas de entrada são particionadas, seu código pode ignorar a separação da partição e apenas processar as linhas. Por exemplo, você pode omitir o código projetado para lidar com um estado no escopo da partição, como um método __init__ e end_partition, e apenas implementar o método process. Para obter mais informações, consulte Processamento com e sem estado neste tópico.

Para processar cada participação como uma unidade, você faria o seguinte:

  • Implementar um método de classe de manipulador __init__ para inicializar o processamento para a partição.

    Para obter mais informações, consulte Como inicializar o manipulador neste tópico.

  • Incluir o código que leva em conta a partição ao processar cada linha com o método process.

    Para obter mais informações sobre linhas de processamento, consulte Processamento de linhas neste tópico.

  • Implementar um método end_partition para finalizar o processamento da partição.

    Para obter mais informações, consulte Como finalizar o processamento da partição neste tópico.

A seguir, veja uma descrição da sequência de invocações ao seu manipulador quando você tiver incluído código projetado para ser executado por partição.

  1. Ao iniciar o processamento para uma partição, e antes que a primeira linha tenha sido processada, o Snowflake usa o método __init__ de sua classe do manipulador para criar uma instância da classe.

    Aqui você pode estabelecer o estado no escopo da partição. Por exemplo, você pode inicializar uma variável de instância para manter um valor calculado a partir de linhas na partição.

  2. Para cada linha da partição, o Snowflake invoca o método process.

    Cada vez que o método é executado, ele pode fazer mudanças nos valores de estado. Por exemplo, você pode fazer o método process atualizar o valor da variável da instância.

  3. Após seu código ter processado a última linha na partição, o Snowflake invoca seu método end_partition.

    A partir desse método, você pode retornar linhas de saída contendo um valor no nível de partição que você deseja retornar. Por exemplo, você pode retornar o valor da variável da instância que você vinha atualizando à medida que processava as linhas na partição.

    Seu método end_partition não receberá nenhum argumento do Snowflake, que simplesmente o invoca depois de processar a última linha na partição.

Como finalizar o processamento da partição

Opcionalmente, você pode implementar um método end_partition em sua classe do manipulador que o Snowflake invocará depois de ter processado todas as linhas em uma partição. Nesse método, você pode executar o código para uma partição depois que todas as linhas da partição tiverem sido processadas. Seu método end_partition pode produzir linhas de saída, de modo a retornar os resultados de um cálculo no escopo da partição. Para obter mais informações, consulte Processamento de partições neste tópico.

A assinatura do método deve ser da seguinte forma:

def end_partition(self):
Copy

O Snowflake espera o seguinte de sua implementação do método end_partition:

  • Ele não deve ser estático.

  • Ele não pode ter nenhum outro parâmetro além de self.

  • Como alternativa ao retorno de um valor tabular, ele pode produzir uma lista vazia ou None.

Nota

Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando end_partition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.

Exemplo de tratamento de partições

O código no exemplo a seguir calcula o custo total pago em compras para um estoque calculando o custo por compra e somando as compras em conjunto (no método process). O código retorna o total no método end_partition.

Para obter um exemplo de uma UDTF que inclui este manipulador, juntamente com a chamada da UDTF, consulte Exemplo de classe do manipulador.

class StockSaleSum:
  def __init__(self):
    self._cost_total = 0
    self._symbol = ""

  def process(self, symbol, quantity, price):
    self._symbol = symbol
    cost = quantity * price
    self._cost_total += cost
    yield (symbol, cost)

  def end_partition(self):
    yield (self._symbol, self._cost_total)
Copy

Ao processar as partições, tenha em mente o seguinte:

  • Seu código pode tratar de partições que não estejam explicitamente especificadas em uma chamada para a UDTF. Mesmo quando uma chamada para a UDTF não inclui uma cláusula PARTITION BY, o Snowflake particiona os dados implicitamente.

  • Seu método process receberá dados de linha na ordem especificada pela cláusula ORDER BY da partição, se houver.

Exemplos

Como usar um pacote importado

Você pode usar pacotes de Python que estão incluídos em uma lista curada de pacotes de terceiros do Anaconda disponíveis no Snowflake. Para especificar esses pacotes como dependências na UDTF, use a cláusula PACKAGES em CREATE FUNCTION.

Você pode descobrir a lista de pacotes incluídos executando o seguinte SQL no Snowflake:

select * from information_schema.packages where language = 'python';
Copy

Para obter mais informações, consulte Como usar pacotes de terceiros e Como criar UDFs de Python.

O código no exemplo a seguir usa uma função no pacote NumPy (Numerical Python) para calcular o preço médio por ação de uma array de compras de ações, cada uma com um preço diferente por ação.

create or replace function stock_sale_average(symbol varchar, quantity number, price number(10,2))
returns table (symbol varchar, total number(10,2))
language python
runtime_version=3.8
packages = ('numpy')
handler='StockSaleAverage'
as $$
import numpy as np

class StockSaleAverage:
    def __init__(self):
      self._price_array = []
      self._quantity_total = 0
      self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      self._price_array.append(float(price))
      cost = quantity * price
      yield (symbol, cost)

    def end_partition(self):
      np_array = np.array(self._price_array)
      avg = np.average(np_array)
      yield (self._symbol, avg)
$$;
Copy

O código no exemplo a seguir chama a UDF anterior, passando valores das colunas symbol, quantity e price da tabela stocks_table. Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar uma UDF.

select stock_sale_average.symbol, total
  from stocks_table,
  table(stock_sale_average(symbol, quantity, price)
    over (partition by symbol));
Copy

Como executar tarefas simultâneas com processos de trabalho

Você pode executar tarefas simultâneas usando processos de trabalho do Python. Você pode achar isso útil quando precisar executar tarefas paralelas que aproveitam vários núcleos de CPU em nós do warehouse.

Nota

A Snowflake recomenda que você não use o módulo de multiprocessamento integrado do Python.

Para contornar os casos em que o Bloqueio de intérprete global do Python impede que uma abordagem multitarefa se espalhe por todos os núcleos da CPU, você pode executar tarefas simultâneas usando processos de trabalho separados, em vez de threads.

Você pode fazer isso nos warehouses Snowflake usando a classe joblib da biblioteca Parallel, como no exemplo a seguir.

CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
  RETURNS TABLE (result INT)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'JoblibMultiprocessing'
  PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt

class JoblibMultiprocessing:
  def process(self, i):
    pass

  def end_partition(self):
    result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
    for r in result:
      yield (r, )
$$;
Copy

Nota

O back-end padrão usado para joblib.Parallel difere entre os warehouses padrão Snowflake e otimizados para Snowpark.

  • Padrão do warehouse: threading

  • Padrão do warehouse otimizado para Snowpark: loky (multiprocessamento)

Você pode substituir a configuração de back-end padrão chamando a função joblib.parallel_backend, como no exemplo a seguir.

import joblib
joblib.parallel_backend('loky')
Copy

Como criar a UDTF com CREATE FUNCTION

Você cria uma UDTF em SQL usando o comando CREATE FUNCTION, especificando o código que você escreveu como manipulador. Para obter a referência do comando, consulte CREATE FUNCTION.

Use a sintaxe a seguir ao criar uma UDTF.

CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
  RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
  LANGUAGE PYTHON
  [ IMPORTS = ( '<imports>' ) ]
  RUNTIME_VERSION = 3.8
  [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
  [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
  HANDLER = '<handler_class>'
  [ AS '<python_code>' ]
Copy

Para associar o código do manipulador que você escreveu com a UDTF, faça o seguinte ao executar CREATE FUNCTION:

  • Em RETURNS TABLE, especifique as colunas de saída em pares de nomes de colunas e tipos.

  • Defina LANGUAGE como PYTHON.

  • Defina o valor da cláusula IMPORTS como o caminho e nome da classe do manipulador se a classe estiver em um local externo, como um estágio.

    Para obter mais informações, consulte Como criar UDFs de Python.

  • Defina RUNTIME_VERSION como a versão do Python runtime que seu código requer. As versões suportadas do Python são:

    • 3.8

    • 3.9

    • 3,10

    • 3,11

  • Defina o valor da cláusula PACKAGES como o nome de um ou mais pacotes, se houver, exigidos pela classe do manipulador.

    Para obter mais informações, consulte Como usar pacotes de terceiros e Como criar UDFs de Python.

  • Defina o valor da cláusula HANDLER como o nome da classe do manipulador.

    Ao associar o código do manipulador em Python com uma UDTF, você pode incluir o código inline ou referir-se a ele em um local em um estágio do Snowflake. O valor HANDLER diferencia entre maiúsculas e minúsculas e deve corresponder ao nome da classe de Python.

    Para obter mais informações, consulte UDFs com código inline vs. UDFs com código carregado de um estágio.

    Importante

    Para uma UDF escalar de Python, o valor da cláusula HANDLER contém o nome do método.

    Para uma UDTF de Python, o valor da cláusula HANDLER contém o nome da classe mas não um nome do método.

    A razão da diferença é que para uma UDFescalar de Python, o nome do método do manipulador é escolhido pelo usuário e, portanto, não é conhecido antecipadamente pelo Snowflake, mas para uma UDTF de Python, os nomes dos métodos (como o método end_partition) são conhecidos porque devem corresponder aos nomes especificados pelo Snowflake.

  • A cláusula AS '<python_code>' é necessária se o código do manipulador for especificado inline com CREATE FUNCTION.