Criação de funções agregadas definidas pelo usuário (UDAFs) para DataFrames em Python¶
Você pode usar as APIs do Snowpark Python para criar e chamar funções agregadas definidas pelo usuário (UDAFs). Uma UDAF recebe uma ou mais linhas como entrada e produz uma única linha de saída. Ela opera em valores de várias linhas para realizar cálculos matemáticos como soma, média, contagem, valores mínimos/máximos, desvio padrão e estimativa, assim como algumas operações não matemáticas.
Para criar e registrar uma UDAF com o Snowpark, você precisa:
Implementar um manipulador da UDAF.
O manipulador contém a lógica da UDAF. Um manipulador de UDAF deve implementar funções que o Snowflake invocará no runtime quando a UDAF for chamada. Para obter mais informações, consulte Como implementar um manipulador.
Registre a UDAF e seu manipulador no banco de dados do Snowflake.
Após registrar a UDAF, você pode chamá-la a partir do SQL ou usando a API do Snowpark. Você pode usar a API do Snowpark para registrar a UDAF e seu manipulador. Para obter mais informações sobre o registro, consulte Como registrar uma UDAF.
Também é possível criar suas próprias UDAFs usando SQL, conforme descrito em Funções de agregação definidas pelo usuário em Python.
Como implementar um manipulador¶
Como descrito em Interface para manipulador de função agregada, um manipulador de UDAF deve implementar métodos que o Snowflake invoca quando a UDAF é chamada. Você pode usar a classe que você escrever como um manipulador. Não importa se você estiver registrando a UDAF com a API do Snowpark ou criando-a com SQL usando a instrução CREATE FUNCTION.
O manipulador de sua UDAF implementa métodos listados na tabela a seguir, que o Snowflake invoca em tempo de execução. Consulte os exemplos neste tópico.
Método |
Requisito |
Descrição |
---|---|---|
|
Obrigatório |
Inicializa o estado interno de um agregado. |
|
Obrigatório |
Retorna o estado interno de um agregado.
|
|
Obrigatório |
Acumula o estado do agregado com base na nova linha de entrada. |
|
Obrigatório |
Combina dois estados agregados intermediários. |
|
Obrigatório |
Produz o resultado final com base no estado agregado. |
Como registrar uma UDAF¶
Após implementar um manipulador de UDAF, você pode usar a API do Snowpark para registrar a UDAF no banco de dados do Snowflake. O registro da UDAF cria a UDAF para que ela possa ser chamada.
Você pode registrar a UDAF como uma função nomeada ou anônima, da mesma forma que com uma UDF escalar. Para obter mais informações relacionadas sobre o registro de uma UDF escalar, consulte Como criar uma UDF anônima e Criação e registro de uma UDF nomeada. Quando você registra uma UDAF, você especifica os valores dos parâmetros de que o Snowflake precisa para criar a UDAF.
Você pode registrar a função usando as seguintes funções e métodos:
Use o método
register
ou a funçãoudaf
, especificando o nome da classe do seu manipulador, juntamente com argumentos para definir a função. Você também pode usarudaf
como um decorador@udaf
na classe do manipulador.Para obter informações de referência sobre eles, consulte o seguinte:
Use a função
register_from_file
, apontando para um arquivo de Python ou arquivo zip contendo o código-fonte de Python.Para obter a referência da função, consulte snowflake.snowpark.udaf.UDAFRegistration.register_from_file.
Exemplos¶
Criar uma UDAF com um valor de retorno e um único parâmetro¶
O código Python no exemplo de manipulador a seguir oferece suporte à sum_int
UDAF que recebe um único argumento inteiro, soma o valor em todas as linhas e retorna o resultado.
Registrar a função¶
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
# This aggregate state is a primitive Python data type.
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value):
self._partial_sum += input_value
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType()])
Chamada da função¶
O código Python no exemplo a seguir invoca a sum_int
UDAF com um DataFrame.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a")).collect()
print(result.collect())
Criar uma UDAF com um valor de retorno e dois parâmetros¶
Registrar a função¶
O código Python no exemplo de manipulador a seguir oferece suporte à sum_int
UDAF que recebe dois argumentos inteiros, soma os valores dos argumentos entre as linhas e retorna o resultado.
import snowflake.snowpark as snowpark
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udaf
def main(session: snowpark.Session):
class PythonSumUDAF:
def __init__(self):
self._partial_sum = 0
@property
def aggregate_state(self):
return self._partial_sum
def accumulate(self, input_value, input_value2):
self._partial_sum += input_value + input_value2
def merge(self, other_partial_sum):
self._partial_sum += other_partial_sum
def finish(self):
return self._partial_sum
sum_udaf = udaf(PythonSumUDAF, name="sum_int", replace=True, return_type=IntegerType(), input_types=[IntegerType(), IntegerType()])
Chamada da função¶
O código Python no exemplo a seguir invoca a sum_int
UDAF com um DataFrame.
df = session.create_dataframe([[1, 3], [1, 4], [2, 5], [2, 6]]).to_df("a", "b")
result = df.agg(sum_udaf("a", "b"))
print(result.collect())