UDTFs vetorizadas de Python

Este tópico apresenta as UDTFs vetorizadas de Python.

Neste tópico:

Visão geral

UDTFs vetorizadas de Python (funções de tabela definidas pelo usuário) fornecem uma maneira de operar em linhas em lotes.

O Snowflake suporta dois tipos de UDTFs vetorizadas:

  • UDTFs com um método vetorizado end_partition

  • UDTFs com um método vetorizado process

É necessário escolher um tipo porque um UDTF não pode ter um método process vetorizado e um método end_partition vetorizado.

UDTFs com um método end_partition vetorizado

UDTFs with a vectorized end_partition method enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.

Use um método end_partition vetorizado para as seguintes tarefas:

  • Processar seus dados partição por partição, em vez de linha por linha.

  • Retornar várias linhas ou colunas para cada partição.

  • Usar bibliotecas que operam em pandas DataFrames para análise de dados.

UDTFs com um método de processo vetorizado

UDTFs com um método process vetorizado fornecem uma maneira de operar sobre linhas em lotes, quando a operação executa um mapeamento 1 para 1. Em outras palavras, o método retorna uma linha de saída para cada linha de entrada. O número de colunas não é restrito.

Use um método process vetorizado para as seguintes tarefas:

  • Aplicar uma transformação 1 para 1 com um resultado multicolunar em lotes.

  • Usar uma biblioteca que exija pandas.DataFrame.

  • Processar linhas em lotes, sem particionamento explícito.

  • Aproveitar a API to_pandas() para transformar o resultado de consulta diretamente para um DataFrame pandas.

Pré-requisitos

A biblioteca Snowpark para Python versão 1.14.0 ou posterior é necessária.

Criação de uma UDTF com um método end_partition vetorizado

  1. Opcional: Defina uma classe de manipulador com um método __init__, que será invocado antes de cada partição ser processada.

    Nota: Não defina um método process.

  2. Defina um método end_partition que receba um argumento DataFrame e retorne ou produza um pandas.DataFrame ou uma tupla de pandas.Series ou pandas.arrays onde cada matriz é uma coluna.

    Os tipos de colunas do resultado devem corresponder aos tipos de colunas na definição da UDTF.

  3. Para marcar o método end_partition como vetorizado, use o decorador @vectorized ou o atributo de função _sf_vectorized_input.

    Para mais informações, consulte UDFs Python vetorizadas. O decorador @vectorized só pode ser usado quando a UDTF Python é executada no Snowflake; por exemplo, ao usar uma planilha SQL. Ao executar usando o cliente ou uma planilha Python, você deve usar o atributo de função.

Nota

Os nomes de coluna padrão para o DataFrame de entrada para uma UDTF com método end_partition vetorizado correspondem à assinatura da função SQL. Os nomes das colunas seguem os requisitos de identificador SQL. Ou seja, se um identificador não estiver entre aspas, ele será escrito em maiúsculas, e se estiver entre aspas duplas, permanecerá inalterado.

O bloco de código a seguir é um exemplo de criação de uma UDTF com um método end_partition vetorizado, usando o decorador @vectorized:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

O bloco de código a seguir é um exemplo de criação de uma UDTF com um método end_partition vetorizado, usando o atributo de função:

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Nota

Uma UDTF com um método end_partition vetorizado deve ser chamado com uma cláusula PARTITION BY para construir as partições.

Para chamar a UDTF com todos os dados na mesma partição:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

Para chamar a UDTF com os dados particionados pela coluna x:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Exemplo: Coleção de linhas usando uma UDTF regular versus uma UDTF com um método end_partition vetorizado

Coleção de linhas usando uma UDTF regular:

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Coleção de linhas usando uma UDTF com um método end_partition vetorizado:

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Exemplo: calcular a estatística resumida para cada coluna na partição

Aqui está um exemplo de como calcular a estatística resumida para cada coluna na partição usando o método pandas describe().

  1. Criação de uma tabela e geração de três partições de cinco linhas cada:

    create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);
    
    -- generate 3 partitions of 5 rows each
    insert into test_values
    select 'x',
    uniform(1.5,1000.5,random(1))::float col1,
    uniform(1.5,1000.5,random(2))::float col2,
    uniform(1.5,1000.5,random(3))::float col3,
    uniform(1.5,1000.5,random(4))::float col4,
    uniform(1.5,1000.5,random(5))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'y',
    uniform(1.5,1000.5,random(10))::float col1,
    uniform(1.5,1000.5,random(20))::float col2,
    uniform(1.5,1000.5,random(30))::float col3,
    uniform(1.5,1000.5,random(40))::float col4,
    uniform(1.5,1000.5,random(50))::float col5
    from table(generator(rowcount => 5));
    
    insert into test_values
    select 'z',
    uniform(1.5,1000.5,random(100))::float col1,
    uniform(1.5,1000.5,random(200))::float col2,
    uniform(1.5,1000.5,random(300))::float col3,
    uniform(1.5,1000.5,random(400))::float col4,
    uniform(1.5,1000.5,random(500))::float col5
    from table(generator(rowcount => 5));
    
    Copy
  2. Veja os dados:

    select * from test_values;
    
    -----------------------------------------------------
    |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
    -----------------------------------------------------
    |x     |8.0     |99.4    |714.6   |168.7   |397.2   |
    |x     |106.4   |237.1   |971.7   |828.4   |988.2   |
    |x     |741.3   |207.9   |32.6    |640.6   |63.2    |
    |x     |541.3   |828.6   |844.9   |77.3    |403.1   |
    |x     |4.3     |723.3   |924.3   |282.5   |158.1   |
    |y     |976.1   |562.4   |968.7   |934.3   |977.3   |
    |y     |390.0   |244.3   |952.6   |101.7   |24.9    |
    |y     |599.7   |191.8   |90.2    |788.2   |761.2   |
    |y     |589.5   |201.0   |863.4   |415.1   |696.1   |
    |y     |46.7    |659.7   |571.1   |938.0   |513.7   |
    |z     |313.9   |188.5   |964.6   |435.4   |519.6   |
    |z     |328.3   |643.1   |766.4   |148.1   |596.4   |
    |z     |929.0   |255.4   |915.9   |857.2   |425.5   |
    |z     |612.8   |816.4   |220.2   |879.5   |331.4   |
    |z     |487.1   |704.5   |471.5   |378.9   |481.2   |
    -----------------------------------------------------
    
    Copy
  3. Crie a função:

    create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
    returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
    language python
    RUNTIME_VERSION = 3.9
    packages=('pandas')
    handler='handler'
    as $$
    from _snowflake import vectorized
    import pandas
    
    class handler:
        @vectorized(input=pandas.DataFrame)
        def end_partition(self, df):
          # using describe function to get the summary statistics
          result = df.describe().transpose()
          # add a column at the beginning for column ids
          result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
          return result
    $$;
    
    Copy
  4. Siga um dos seguintes passos:

    • Chame a função e particione por id:

      -- partition by id
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by id))
      order by id, column_name;
      
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
      |x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
      |y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
      |z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
      --------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy
    • Chame a função e trate a tabela inteira como uma partição:

      -- treat the whole table as one partition
      select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
      over (partition by 1))
      order by id, column_name;
      
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
      |NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
      ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
      
      Copy

Crie uma UDTF com um método de processo vetorizado

  1. Defina uma classe de manipulador, semelhante às UDTFs regulares, com métodos __init__ e end_partition opcionais.

  2. Defina um método process que receba um argumento DataFrame e retorne um pandas.DataFrame ou uma tupla de pandas.Series ou pandas.arrays onde cada matriz é uma coluna.

    Os tipos de colunas do resultado devem corresponder aos tipos de colunas na definição da UDTF. O resultado retornado deve ser exatamente um DataFrame ou tupla. Isso é diferente de um método vetorizado end_partition onde você pode produzir ou retornar uma lista.

  3. Para marcar o método process como vetorizado, use o decorador @vectorized ou o atributo de função _sf_vectorized_input.

    Para mais informações, consulte UDFs Python vetorizadas. O decorador @vectorized só pode ser usado quando a UDTF Python é executada no Snowflake; por exemplo, ao usar uma planilha SQL. Ao executar usando o cliente ou uma planilha Python, você deve usar o atributo de função.

  4. Opcional: Se a função do manipulador Python estiver excedendo o limite de tempo de execução, defina um tamanho de lote de destino.

Nota

Os nomes de coluna padrão para o DataFrame de entrada para uma UDTF com método process vetorizado correspondem à assinatura da função SQL. Os nomes das colunas seguem os requisitos de identificador SQL. Ou seja, se um identificador não estiver entre aspas, ele será escrito em maiúsculas, e se estiver entre aspas duplas, permanecerá inalterado.

O manipulador de uma UDTF com um método vetorizado process pode ser implementado para processar lotes de maneira consciente da partição ou para processá-los simplesmente lote por lote. Para mais informações, consulte processamento com e sem estado.

Exemplo: use uma UDTF com um método de processo vetorizado para aplicar um hot encoding

Use uma UDTF com um método process vetorizado para aplicar uma codificação one-hot em uma tabela com dez categorias:

import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame

class one_hot_encode:
  def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
      return pd.get_dummies(df)
  process._sf_vectorized_input = pd.DataFrame


one_hot_encode_udtf = session.udtf.register(
  one_hot_encode,
  output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
  input_names=['"categ"']
)

df_table = session.table("categories")
df_table.show()
Copy

Resultado da amostra:

-----------
|"CATEG"  |
-----------
|categ1   |
|categ6   |
|categ8   |
|categ5   |
|categ7   |
|categ5   |
|categ1   |
|categ2   |
|categ2   |
|categ4   |
-----------

Prepare-se para imprimir a tabela:

res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Copy

Resultado da amostra:

    CATEG  CATEG0  CATEG1  CATEG2  CATEG3  CATEG4  CATEG5  CATEG6  CATEG7  CATEG8  CATEG9
0  categ0       1       0       0       0       0       0       0       0       0       0
1  categ0       1       0       0       0       0       0       0       0       0       0
2  categ5       0       0       0       0       0       1       0       0       0       0
3  categ3       0       0       0       1       0       0       0       0       0       0
4  categ8       0       0       0       0       0       0       0       0       1       0

É possível obter o mesmo resultado com uma UDF vetorizada, embora seja menos conveniente. Você precisa empacotar os resultados em uma coluna e descompactar a coluna para restaurar os resultados em um DataFrame pandas utilizável.

Exemplo de uso de uma UDF vetorizada:

def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
  return pd.get_dummies(df).to_dict('records')

one_hot_encode._sf_vectorized_input = pd.DataFrame

one_hot_encode_udf = session.udf.register(
  one_hot_encode,
  output_schema=["encoding"],
)

df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0  {\n  "categ0": false,\n  "categ1": false,\n  "...
1  {\n  "categ0": false,\n  "categ1": true,\n  "c...
2  {\n  "categ0": false,\n  "categ1": false,\n  "...
3  {\n  "categ0": false,\n  "categ1": false,\n  "...
4  {\n  "categ0": true,\n  "categ1": false,\n  "c...
Copy

Suporte a tipos

UDTFs vetorizadas oferecem suporte aos mesmos tipos SQL como UDFs vetorizadas. No entanto, para UDTFs vetorizadas, argumentos SQL NUMBER com uma escala de 0 que todos se encaixam em um tipo inteiro de 64 bits ou menor sempre serão mapeados para Int16, Int32 ou Int64. Diferentemente de UDFs escalares, se o argumento de uma UDTF não for anulável, ele não será convertido em int16, int32 ou int64.

Para visualizar uma tabela que mostra como os tipos SQL são mapeados para dtypes do Pandas, consulte a tabela de suporte de tipos no tópico UDFs vetorizadas de Python.

Práticas recomendadas

  • Se um escalar precisar ser retornado com cada linha, crie uma lista de valores repetidos em vez de descompactar a matriz numpy para criar tuplas. Por exemplo, para um resultado de duas colunas, em vez de:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Use isto:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  • Para melhorar o desempenho, descompacte os dados semiestruturados em colunas.

    Por exemplo, se você tiver uma coluna variante, obj, com elementos, x(int), y(float) e z(string), em vez de definir uma UDTF com uma assinatura como esta e chamá-la usando vec_udtf(obj):

    create function vec_udtf(variant obj)
    
    Copy

    Defina a UDTF com uma assinatura como esta e chame-a usando vec_udtf(obj:x, obj:y, obj:z):

    create function vec_udtf(int, float, string)
    
    Copy
  • Por padrão, o Snowflake codifica as entradas em dtypes pandas que oferecem suporte aos valores NULL (por exemplo, Int64). Se você estiver usando uma biblioteca que requer um tipo primitivo (como numpy) e sua entrada não tiver valores NULL, você deverá converter a coluna em um tipo primitivo antes de usar a biblioteca. Por exemplo:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Para obter mais informações, consulte Suporte a tipos.

  • Ao usar UDTFs com um método vetorizado end_partition, para melhorar o desempenho e evitar tempos limite, evite usar pandas.concat para acumular resultados parciais. Em vez disso, produza o resultado parcial sempre que estiver pronto.

    Por exemplo, em vez de:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Faça isto:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy