UDTFs vetorizadas de Python

Este tópico apresenta as UDTFs vetorizadas de Python.

Neste tópico:

Visão geral

Vectorized Python UDTFs (user-defined table functions), which are UDTFs with a vectorized end_partition, enable seamless partition-by-partition processing by operating on partitions as pandas DataFrames and returning results as pandas DataFrames or lists of pandas arrays or pandas Series. This makes for easy integration with libraries that operate on pandas DataFrames or pandas arrays.

Use quando:

  • Você precisa processar seus dados partição por partição, em vez de linha por linha.

  • Você precisa retornar várias linhas ou colunas para cada partição.

  • Você deseja usar bibliotecas que operam em pandas DataFrames para análise de dados.

Pré-requisitos

A biblioteca Snowpark para Python versão 1.6.1 ou posterior é necessária.

Introdução

Para criar uma UDTF com uma partição_final vetorizada:

  • Opcionalmente, defina uma classe do manipulador com um método __init__ que será invocado antes de processar cada partição.

  • Não defina um método process.

  • Defina um método end_partition que receba um argumento DataFrame e retorne ou produza um pandas.DataFrame ou uma tupla de pandas.Series ou pandas.arrays onde cada matriz é uma coluna. Os tipos de colunas do resultado devem corresponder aos tipos de colunas na definição da UDTF.

  • Marque o método end_partition como vetorizado usando o decorador @vectorized ou o atributo de função _sf_vectorized_input. Para obter mais informações, consulte UDFs vetorizadas de Python. O decorador @vectorized só pode ser usado quando a UDTF de Python é executada no Snowflake, por exemplo, ao usar uma planilha de SQL. Ao executar usando o cliente ou uma planilha Python, você deve usar o atributo de função.

Nota

Os nomes de coluna padrão para o DataFrame de entrada para uma UDTF com partição_final vetorizada correspondem à assinatura da função SQL. Os nomes das colunas seguirão os requisitos de identificador SQL. Ou seja, se um identificador não estiver entre aspas, ele será colocado em maiúscula e, se estiver entre aspas duplas, será preservado como está.

Aqui está um exemplo de criação de uma UDTF com uma partição_final vetorizada de Python usando o decorador @vectorized.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    # initialize a state
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
    # process the DataFrame
    return result_df
Copy

Aqui está um exemplo de criação de uma UDTF com uma partição_final vetorizada, usando o atributo de função.

import pandas

class handler:
  def __init__(self):
    # initialize a state
  def end_partition(self, df):
    # process the DataFrame
    return result_df

handler.end_partition._sf_vectorized_input = pandas.DataFrame
Copy

Nota

Uma UDTF com uma com uma partição_final deve ser chamada com a cláusula PARTITION BY para construir as partições.

Para chamar a UDTF com todos os dados na mesma partição:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Copy

Para chamar a UDTF com os dados particionados pela coluna x:

SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Copy

Suporte a tipos

UDTFs com uma partição_final oferecem suporte aos mesmos tipos SQL que UDFs de Python vetorizadas para argumentos e valores de retorno. No entanto, para UDTFs com uma partição_final, argumentos SQL NUMBER com uma escala de 0 que todos se encaixam em um tipo inteiro de 64 bits ou menor sempre serão mapeados para Int16, Int32 ou Int64. Em outras palavras, diferentemente de UDFs escalares, se o argumento de uma UDTF não for anulável, ele não será convertido em int16, int32 ou int64.

Para visualizar uma tabela que mostra como os tipos SQL são mapeados para dtypes do Pandas, consulte a tabela de suporte de tipos no tópico UDFs vetorizadas de Python.

Exemplo: coleção de linhas usando uma UDTF regular versus usando uma UDTF com uma partição_final

Aqui está um exemplo de como fazer a coleção de linhas usando uma UDTF regular.

import pandas

class handler:
  def __init__(self):
    self.rows = []
  def process(self, *row):
    self.rows.append(row)
  def end_partition(self):
    df = pandas.DataFrame(self.rows)
    # process the DataFrame
    return result_df
Copy

Aqui está um exemplo de como fazer a coleção de linhas usando uma UDTF com uma partição_final.

from _snowflake import vectorized
import pandas

class handler:
  def __init__(self):
    self.rows = []
  @vectorized(input=pandas.DataFrame)
  def end_partition(self, df):
  # process the DataFrame
    return result_df
Copy

Exemplo: calcular a estatística resumida para cada coluna na partição

Aqui está um exemplo de como calcular a estatística resumida para cada coluna na partição usando o método pandas describe().

Primeiro, crie uma tabela e gere 3 partições de 5 linhas cada.

create or replace table test_values(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float);

-- generate 3 partitions of 5 rows each
insert into test_values
select 'x',
uniform(1.5,1000.5,random(1))::float col1,
uniform(1.5,1000.5,random(2))::float col2,
uniform(1.5,1000.5,random(3))::float col3,
uniform(1.5,1000.5,random(4))::float col4,
uniform(1.5,1000.5,random(5))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'y',
uniform(1.5,1000.5,random(10))::float col1,
uniform(1.5,1000.5,random(20))::float col2,
uniform(1.5,1000.5,random(30))::float col3,
uniform(1.5,1000.5,random(40))::float col4,
uniform(1.5,1000.5,random(50))::float col5
from table(generator(rowcount => 5));

insert into test_values
select 'z',
uniform(1.5,1000.5,random(100))::float col1,
uniform(1.5,1000.5,random(200))::float col2,
uniform(1.5,1000.5,random(300))::float col3,
uniform(1.5,1000.5,random(400))::float col4,
uniform(1.5,1000.5,random(500))::float col5
from table(generator(rowcount => 5));
Copy

Dê uma olhada nos dados.

select * from test_values;

-----------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |
-----------------------------------------------------
|x     |8.0     |99.4    |714.6   |168.7   |397.2   |
|x     |106.4   |237.1   |971.7   |828.4   |988.2   |
|x     |741.3   |207.9   |32.6    |640.6   |63.2    |
|x     |541.3   |828.6   |844.9   |77.3    |403.1   |
|x     |4.3     |723.3   |924.3   |282.5   |158.1   |
|y     |976.1   |562.4   |968.7   |934.3   |977.3   |
|y     |390.0   |244.3   |952.6   |101.7   |24.9    |
|y     |599.7   |191.8   |90.2    |788.2   |761.2   |
|y     |589.5   |201.0   |863.4   |415.1   |696.1   |
|y     |46.7    |659.7   |571.1   |938.0   |513.7   |
|z     |313.9   |188.5   |964.6   |435.4   |519.6   |
|z     |328.3   |643.1   |766.4   |148.1   |596.4   |
|z     |929.0   |255.4   |915.9   |857.2   |425.5   |
|z     |612.8   |816.4   |220.2   |879.5   |331.4   |
|z     |487.1   |704.5   |471.5   |378.9   |481.2   |
-----------------------------------------------------
Copy

Depois, crie a função.

create or replace function summary_stats(id varchar, col1 float, col2 float, col3 float, col4 float, col5 float)
returns table (column_name varchar, count int, mean float, std float, min float, q1 float, median float, q3 float, max float)
language python
runtime_version=3.8
packages=('pandas')
handler='handler'
as $$
from _snowflake import vectorized
import pandas

class handler:
    @vectorized(input=pandas.DataFrame)
    def end_partition(self, df):
      # using describe function to get the summary statistics
      result = df.describe().transpose()
      # add a column at the beginning for column ids
      result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5'])
      return result
$$;
Copy

Chame a função e particione por id.

-- partition by id
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by id))
order by id, column_name;

--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"              |"STD"               |"MIN"  |"Q1"   |"MEDIAN"  |"Q3"   |"MAX"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |280.25999999999993  |339.5609267863427   |4.3    |8.0    |106.4     |541.3  |741.3  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |419.25999999999993  |331.72476995244114  |99.4   |207.9  |237.1     |723.3  |828.6  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |697.62              |384.2964311569911   |32.6   |714.6  |844.9     |924.3  |971.7  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |399.5               |321.2689294033894   |77.3   |168.7  |282.5     |640.6  |828.4  |
|x     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |401.96000000000004  |359.83584173897964  |63.2   |158.1  |397.2     |403.1  |988.2  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |520.4               |339.16133329139984  |46.7   |390.0  |589.5     |599.7  |976.1  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |371.84              |221.94799616126298  |191.8  |201.0  |244.3     |562.4  |659.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |689.2               |371.01012789410476  |90.2   |571.1  |863.4     |952.6  |968.7  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |635.46              |366.6140927460372   |101.7  |415.1  |788.2     |934.3  |938.0  |
|y     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |594.64              |359.0334218425911   |24.9   |513.7  |696.1     |761.2  |977.3  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |5        |534.22              |252.58182238633088  |313.9  |328.3  |487.1     |612.8  |929.0  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |5        |521.58              |281.4870103574941   |188.5  |255.4  |643.1     |704.5  |816.4  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |5        |667.72              |315.53336907528495  |220.2  |471.5  |766.4     |915.9  |964.6  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |5        |539.8199999999999   |318.73025742781306  |148.1  |378.9  |435.4     |857.2  |879.5  |
|z     |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |5        |470.82              |99.68626786072393   |331.4  |425.5  |481.2     |519.6  |596.4  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Como opção, chame a função e trate a tabela inteira como uma partição.

-- treat the whole table as one partition
select * from test_values, table(summary_stats(id, col1, col2, col3, col4, col5)
over (partition by 1))
order by id, column_name;

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ID"  |"COL1"  |"COL2"  |"COL3"  |"COL4"  |"COL5"  |"COLUMN_NAME"  |"COUNT"  |"MEAN"             |"STD"               |"MIN"  |"Q1"                |"MEDIAN"  |"Q3"    |"MAX"  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col1           |15       |444.96             |314.01110034974425  |4.3    |210.14999999999998  |487.1     |606.25  |976.1  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col2           |15       |437.56             |268.95505944302295  |99.4   |204.45              |255.4     |682.1   |828.6  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col3           |15       |684.8466666666667  |331.87254839915937  |32.6   |521.3               |844.9     |938.45  |971.7  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col4           |15       |524.9266666666666  |327.074780585783    |77.3   |225.6               |435.4     |842.8   |938.0  |
|NULL  |NULL    |NULL    |NULL    |NULL    |NULL    |col5           |15       |489.14             |288.9176669671038   |24.9   |364.29999999999995  |481.2     |646.25  |988.2  |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Copy

Práticas recomendadas

Esta seção descreve as práticas recomendadas.

  1. Para melhorar o desempenho e evitar tempos limite, evite usar pandas.concat para acumular resultados parciais. Em vez disso, produza o resultado parcial sempre que estiver pronto. Por exemplo, em vez de:

    results = []
    while(...):
      partial_result = pd.DataFrame(...)
      results.append(partial_result)
    return pd.concat(results)
    
    Copy

    Faça isto:

    while(...):
      partial_result = pd.DataFrame(...)
      yield partial_result
    
    Copy
  2. Se um escalar precisar ser retornado com cada linha, crie uma lista de valores repetidos em vez de descompactar a matriz numpy para criar tuplas. Por exemplo, para um resultado de 2 colunas, em vez de:

    return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
    
    Copy

    Faça isto:

    return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
    
    Copy
  3. Para melhorar o desempenho, descompacte os dados semiestruturados em colunas. Por exemplo, se você tiver uma coluna variante, obj, com elementos x(int), y(float) e z(string), em vez de definir uma UDTF com uma assinatura como esta:

    create function vec_udtf(variant obj)
    
    Copy

    E chamando-a usando vec_udtf(obj), você deve definir a UDTF com assinatura:

    create function vec_udtf(int, float, string)
    
    Copy

    E chamá-la usando vec_udtf(obj:x, obj:y, obj:z).

  4. Por padrão, o Snowflake codifica as entradas em dtypes pandas que oferecem suporte aos valores NULL (por exemplo, Int64). Se você estiver usando uma biblioteca que requer um tipo primitivo (como numpy) e sua entrada não tiver valores NULL, você deverá converter a coluna em um tipo primitivo antes de usar a biblioteca. Por exemplo:

    input_df['y'] =  input_df['y'].astype("int64")
    
    Copy

    Para obter mais informações, consulte Suporte a tipos.