Padrões comuns de recurso e consulta

Nota

A API do Snowflake Feature Store está disponível no pacote Snowpark ML Python (snowflake-ml-python) v1.5.0 e posterior.

A classe FeatureView aceita um objeto do DataFrame Snowpark contendo a lógica de transformação de recursos. Você pode, portanto, descrever suas características de qualquer forma suportada pela API do DataFrame Snowpark ou pelo Snowflake SQL. É possível passar o DataFrame diretamente ao construtor FeatureView.

A API Snowpark Python fornece funções de análise para definir facilmente muitos tipos de recurso comuns, como agregações em janelas. Este tópico contém alguns exemplos disso.

O snowflake-ml-python de código aberto no Github também contém algumas amostra de exibição de recurso e definições de entidade usando conjuntos de dados públicos.

Recursos por linha

Em recursos por linha, as funções são aplicadas a cada linha de dados tabulares. Por exemplo, o código a seguir preenche nulo em foo com zero, então calcula um código ZIP de lat e long. Há uma linha de saída por linha de entrada.

Python

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

Recursos por grupo

Os recursos por grupo agregam valores em uma coluna dentro de um grupo. Por exemplo, a soma da precipitação diária pode ser agrupada por cidade para previsão do tempo. O DataFrame de saída tem uma linha por grupo.

Python

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

Recursos de janela baseados em linha

A janela baseada em linha apresenta valores agregados em uma janela fixa de linhas; por exemplo, somando os três últimos valores de transação. A saída DataFrame tem uma linha por quadro de janela.

Python

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

Recursos de agregação móvel

Os recursos de agregação móvel calculam estatísticas móveis, como soma e média, dentro de um tamanho de janela especificado. Esta função calcula dinamicamente esses agregados em diferentes subconjuntos do DataFrame com base nos tamanhos de janela, ordem e agrupamentos definidos. A saída DataFrame tem uma linha por quadro de janela.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Recursos de agregação cumulativa

A agregação cumulativa calcula totais contínuos, mínimos, máximos e outras estatísticas cumulativas em uma partição de dados, que são classificadas e agrupadas conforme especificado. Diferentemente dos agregados móveis, esses totais se estendem do início da partição até o fim, dependendo da direção especificada, fornecendo totais contínuos que não são zerados. A saída DataFrame tem uma linha por linha de entrada.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

Recursos de atraso

Os recursos de atraso introduzem novas colunas contendo valores de linhas anteriores dentro de cada partição, deslocadas por um número especificado de linhas. Esta função é essencial para comparar valores atuais com valores anteriores em um conjunto de dados, auxiliando assim na detecção de tendências ou mudanças ao longo do tempo. A saída DataFrame tem uma linha por linha de entrada.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Recursos de avanço

O inverso dos recursos de atraso, os recursos de avanço criam novas colunas contendo valores de linhas subsequentes, deslocando os dados para cima. Esse recurso é essencial para fazer previsões ou suposições com base em pontos de dados futuros já presentes em um conjunto de dados. A saída DataFrame tem uma linha por linha de entrada.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Recursos de séries temporais

Os recursos de séries temporais calculam valores de recursos com base em uma janela de tempo e uma posição fixa ao longo do eixo de tempo. Exemplos incluem a contagem de viagens na semana passada para serviços de transporte compartilhado ou a soma de vendas nos últimos três dias. A saída DataFrame tem uma linha por janela de tempo.

Versões recentes do Snowflake Feature Store incluem uma API de agregação experimental de séries temporais. Usando esta API, um recurso de série temporal pode ser criado usando código como o seguinte.

Python

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

Também é possível construir recursos de séries temporais com sintaxe RANGE BETWEEN em SQL. Para mais detalhes, consulte Funções de janela Snowflake.

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

Como usar funções definidas pelo usuário em pipelines de recursos

O Snowflake Feature Store oferece suporte a funções definidas pelo usuário (UDFs) em definições de pipeline de recursos. Entretanto, apenas funções determinísticas (funções que sempre retornam o mesmo resultado para a mesma entrada) podem ser mantidas incrementalmente. Para habilitar a manutenção incremental, marque sua UDF como imutável ao registrá-la.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

Se sua função for escrita em SQL, especifique a palavra-chave IMMUTABLE. Consulte este guia.