Criação de funções de tabela definidas pelo usuário (UDTFs) para DataFrames em Python

A API do Snowpark fornece métodos que você pode usar para criar uma função de tabela definida pelo usuário com um manipulador escrito em Python. Este tópico explica como criar esses tipos de funções.

Neste tópico:

Introdução

Você pode criar uma função de tabela definida pelo usuário (UDTF) usando a API do Snowpark.

Isso é feito de forma semelhante à criação de uma função escalar definida pelo usuário (UDF) com a API, conforme descrito em Criação de funções definidas pelo usuário (UDFs) para DataFrames em Python. As principais diferenças incluem os requisitos do manipulador de UDF e os valores de parâmetros necessários ao registrar a UDTF.

Para criar e registrar uma UDTF com o Snowpark, você deve:

  • Implementar um manipulador da UDTF.

    O manipulador contém a lógica da UDTF. Um manipulador de UDTF deve implementar funções que o Snowflake invocará no runtime quando a UDTF for chamada. Para obter mais informações, consulte Como implementar um manipulador de UDTF.

  • Registre a UDTF e seu manipulador no banco de dados do Snowflake.

    Você pode usar a API do Snowpark para registrar a UDTF e seu manipulador. Após registrar a UDTF, você pode chamá-la a partir do SQL ou usando a API do Snowpark. Para obter mais informações sobre o registro, consulte Como registrar uma UDTF.

Para obter mais informações sobre como chamar uma UDTF, consulte Como chamar funções de tabela definidas pelo usuário (UDTFs).

Como implementar um manipulador de UDTF

Como descrito em detalhes em Como escrever uma UDTF em Python, um manipulador de UDTF deve implementar métodos que o Snowflake invoca quando a UDTF é chamada. Você pode usar a classe que você escrever como um manipulador. Não importa se você estiver registrando a UDTF com a API do Snowpark ou criando-a com SQL usando a instrução CREATE FUNCTION.

Os métodos de uma classe de manipulador são projetados para processar linhas e partições recebidas pela UDTF.

Uma classe de manipulador da UDTF implementa os seguintes, que o Snowflake invoca no runtime:

  • Um método __init__. Opcional. Invocado para inicializar o processamento com estado das partições de entrada.

  • Um método process. Obrigatório. Invocado para cada linha de entrada. O método retorna um valor tabular como tuplas.

  • Um método end_partition. Opcional. Convocado para finalizar o processamento das partições de entrada.

    Enquanto o Snowflake oferece suporte a grandes partições com tempos limite ajustados para processá-las com sucesso, as grandes partições em especial podem causar um tempo limite no processamento (como quando end_partition leva muito tempo para ser concluído). Entre em contato com o suporte Snowflake se você precisar ajustar o tempo limite para cenários específicos de uso.

Para obter mais detalhes e exemplos de manipuladores, consulte Como escrever uma UDTF em Python.

Como registrar uma UDTF

Após implementar um manipulador de UDTF, você pode usar a API do Snowpark para registrar a UDTF no banco de dados do Snowflake. O registro da UDTF cria a UDTF para que ela possa ser chamada.

Você pode registrar a UDTF como uma função nomeada ou anônima, da mesma forma que com uma UDF escalar. Para obter mais informações relacionadas sobre o registro de uma UDF escalar, consulte Como criar uma UDF anônima e Criação e registro de uma UDF nomeada.

Quando você registra uma UDTF, você especifica os valores dos parâmetros de que o Snowflake precisa para criar a UDTF. (Muitos desses parâmetros correspondem funcionalmente a cláusulas da instrução CREATE FUNCTION de SQL. Para obter mais informações, consulte CREATE FUNCTION).

A maioria desses parâmetros são os mesmos que você especifica quando cria uma UDF escalar (para obter mais informações, consulte Criação de funções definidas pelo usuário (UDFs) para DataFrames em Python). As principais diferenças se devem ao fato de que uma UDTF retorna um valor tabular e que seu manipulador é uma classe, em vez de uma função. Para uma lista completa dos parâmetros, consulte a documentação das APIs nos links abaixo.

Para registrar uma UDTF com o Snowpark, você usa um dos seguintes parâmetros, especificando os valores de parâmetro necessários para criar a UDTF no banco de dados. Para obter mais informações que diferenciam estas opções, consulte UDFRegistration, que descreve opções semelhantes para o registro de uma UDF escalar.

Como definir os tipos de entrada e o esquema de saída de uma UDTF.

Quando você registra uma UDTF, você especifica detalhes sobre os parâmetros e o valor de saída da função. Você faz isso para que a própria função declare tipos que correspondam exatamente àqueles para o manipulador subjacente da função.

Para obter exemplos, consulte Exemplos neste tópico e na referência snowflake.snowpark.udtf.UDTFRegistration.

Você especifica os seguintes para a UDTF ao registrá-la:

  • Tipos de seus parâmetros de entrada como um valor do parâmetro input_types da função de registro. O parâmetro input_types é opcional se você fornecer indicações de tipo na declaração do método process.

    Especifique esse valor como uma lista de tipos baseada em snowflake.snowpark.types.DataType. Por exemplo, você pode especificar input_types=[StringType(), IntegerType()].

  • Esquema de sua saída tabular como um valor do parâmetro output_schema da função de registro.

    O valor output_schema pode ser um dos seguintes:

    • Uma lista dos nomes das colunas no valor de retorno da UDTF.

      A lista incluirá apenas nomes de colunas; portanto, você também deve fornecer indicações de tipo na declaração do método process.

    • Um StructType que representa os nomes das colunas e tipos da tabela de saída.

      O código no exemplo a seguir atribui um esquema como um valor a uma variável output, depois usa a variável ao registrar a UDTF.

      >>> from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
      >>> from snowflake.snowpark.functions import udtf, table_function
      >>> schema = StructType([
      ...     StructField("symbol", StringType())
      ...     StructField("cost", IntegerType()),
      ... ])
      >>> @udtf(output_schema=schema,input_types=[StringType(), IntegerType(), FloatType()],stage_location="straut_udf",is_permanent=True,name="test_udtf",replace=True)
      ... class StockSale:
      ...     def process(self, symbol, quantity, price):
      ...         cost = quantity * price
      ...         yield (symbol, cost)
      
      Copy

Exemplos

A seguir, veja uma breve lista de exemplos. Para obter mais exemplos, consulte snowflake.snowpark.udtf.UDTFRegistration.

Como registrar uma UDTF com a função udtf

Registre a função.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> class GeneratorUDTF:
...     def process(self, n):
...         for i in range(n):
...             yield (i, )
>>> generator_udtf = udtf(GeneratorUDTF, output_schema=StructType([StructField("number", IntegerType())]), input_types=[IntegerType()])
Copy

Chame a função.

>>> session.table_function(generator_udtf(lit(3))).collect()  # Query it by calling it
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
>>> session.table_function(generator_udtf.name, lit(3)).collect()  # Query it by using the name
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy

Como registrar uma UDTF com a função de registro

Registre a função.

>>> from collections import Counter
>>> from typing import Iterable, Tuple
>>> from snowflake.snowpark.functions import lit
>>> class MyWordCount:
...     def __init__(self):
...         self._total_per_partition = 0
...
...     def process(self, s1: str) -> Iterable[Tuple[str, int]]:
...         words = s1.split()
...         self._total_per_partition = len(words)
...         counter = Counter(words)
...         yield from counter.items()
...
...     def end_partition(self):
...         yield ("partition_total", self._total_per_partition)
>>> udtf_name = "word_count_udtf"
>>> word_count_udtf = session.udtf.register(
...     MyWordCount, ["word", "count"], name=udtf_name, is_permanent=False, replace=True
... )
Copy

Chame a função.

>>> # Call it by its name
>>> df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
>>> df1.show()
-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------
Copy

Como registrar uma UDTF com a função register_from_file

Registre a função.

>>> from snowflake.snowpark.types import IntegerType, StructField, StructType
>>> from snowflake.snowpark.functions import udtf, lit
>>> _ = session.sql("create or replace temp stage mystage").collect()
>>> _ = session.file.put("tests/resources/test_udtf_dir/test_udtf_file.py", "@mystage", auto_compress=False)
>>> generator_udtf = session.udtf.register_from_file(
...     file_path="@mystage/test_udtf_file.py",
...     handler_name="GeneratorUDTF",
...     output_schema=StructType([StructField("number", IntegerType())]),
...     input_types=[IntegerType()]
... )
Copy

Chame a função.

>>> session.table_function(generator_udtf(lit(3))).collect()
[Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]
Copy