Criação de funções agregadas definidas pelo usuário (UDAFs) para DataFrames em Python

Você pode usar as APIs do Snowpark Python para criar e chamar funções agregadas definidas pelo usuário (UDAFs). Uma UDAF recebe uma ou mais linhas como entrada e produz uma única linha de saída. Ela opera em valores de várias linhas para realizar cálculos matemáticos como soma, média, contagem, valores mínimos/máximos, desvio padrão e estimativa, assim como algumas operações não matemáticas.

Para criar e registrar uma UDAF com o Snowpark, você precisa:

  • Implementar um manipulador da UDAF.

    O manipulador contém a lógica da UDAF. Um manipulador de UDAF deve implementar funções que o Snowflake invocará no runtime quando a UDAF for chamada. Para obter mais informações, consulte Como implementar um manipulador.

  • Registre a UDAF e seu manipulador no banco de dados do Snowflake.

    Após registrar a UDAF, você pode chamá-la a partir do SQL ou usando a API do Snowpark. Você pode usar a API do Snowpark para registrar a UDAF e seu manipulador. Para obter mais informações sobre o registro, consulte Como registrar uma UDAF.

Também é possível criar suas próprias UDAFs usando SQL, conforme descrito em Funções de agregação definidas pelo usuário em Python.

Como implementar um manipulador

Como descrito em Interface para manipulador de função agregada, um manipulador de UDAF deve implementar métodos que o Snowflake invoca quando a UDAF é chamada. Você pode usar a classe que você escrever como um manipulador. Não importa se você estiver registrando a UDAF com a API do Snowpark ou criando-a com SQL usando a instrução CREATE FUNCTION.

O manipulador de sua UDAF implementa métodos listados na tabela a seguir, que o Snowflake invoca em tempo de execução. Consulte os exemplos neste tópico.

Método

Requisito

Descrição

__init__

Obrigatório

Inicializa o estado interno de um agregado.

aggregate_state

Obrigatório

Retorna o estado interno de um agregado.

  • O método deve ter um decorador @propriedade.

  • Um objeto de estado agregado pode ser qualquer tipo de dados Python serializável pela biblioteca Python Pickle.

  • Para estados agregados simples, use um tipo de dados primitivo do Python. Para estados agregados mais complexos, use classes de dados Python.

accumulate

Obrigatório

Acumula o estado do agregado com base na nova linha de entrada.

merge

Obrigatório

Combina dois estados agregados intermediários.

finish

Obrigatório

Produz o resultado final com base no estado agregado.

Como registrar uma UDAF

Após implementar um manipulador de UDAF, você pode usar a API do Snowpark para registrar a UDAF no banco de dados do Snowflake. O registro da UDAF cria a UDAF para que ela possa ser chamada.

Você pode registrar a UDAF como uma função nomeada ou anônima, da mesma forma que com uma UDF escalar. Para obter mais informações relacionadas sobre o registro de uma UDF escalar, consulte Como criar uma UDF anônima e Criação e registro de uma UDF nomeada. Quando você registra uma UDAF, você especifica os valores dos parâmetros de que o Snowflake precisa para criar a UDAF.

Você pode registrar a função usando as seguintes funções e métodos:

Exemplos

Criar uma UDAF com um valor de retorno e um único parâmetro

O código Python no exemplo de manipulador a seguir oferece suporte à sum_int UDAF que recebe um único argumento inteiro, soma o valor em todas as linhas e retorna o resultado.

Registrar a função

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
  def __init__(self):
    # This aggregate state is a primitive Python data type.
    self._partial_sum = 0

  @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value):
    self._partial_sum += input_value

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Copy

Chamada da função

O código Python no exemplo a seguir invoca a sum_int UDAF com um DataFrame.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Copy

Criar uma UDAF com um valor de retorno e dois parâmetros

Registrar a função

O código Python no exemplo de manipulador a seguir oferece suporte à sum_int UDAF que recebe dois argumentos inteiros, soma os valores dos argumentos entre as linhas e retorna o resultado.

import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
  class PythonSumUDAF:
    def __init__(self):
      self._partial_sum = 0

    @property
  def aggregate_state(self):
    return self._partial_sum

  def accumulate(self, input_value, input_value2):
    self._partial_sum += input_value + input_value2

  def merge(self, other_partial_sum):
    self._partial_sum += other_partial_sum

  def finish(self):
    return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Copy

Chamada da função

O código Python no exemplo a seguir invoca a sum_int UDAF com um DataFrame.

df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())
Copy