Padrões comuns de recurso e consulta¶
Nota
A API do Snowflake Feature Store está disponível no pacote Snowpark ML Python (snowflake-ml-python
) v1.5.0 e posterior.
A classe FeatureView
aceita um objeto do DataFrame Snowpark contendo a lógica de transformação de recursos. Você pode, portanto, descrever suas características de qualquer forma suportada pela API do DataFrame Snowpark ou pelo Snowflake SQL. É possível passar o DataFrame diretamente ao construtor FeatureView
.
A API Snowpark Python fornece funções de análise para definir facilmente muitos tipos de recurso comuns, como agregações em janelas. Este tópico contém alguns exemplos disso.
O snowflake-ml-python de código aberto no Github também contém algumas amostra de exibição de recurso e definições de entidade usando conjuntos de dados públicos.
Recursos por linha¶
Em recursos por linha, as funções são aplicadas a cada linha de dados tabulares. Por exemplo, o código a seguir preenche nulo em foo
com zero, então calcula um código ZIP de lat
e long
. Há uma linha de saída por linha de entrada.
Python
def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.fillna({"foo": 0})
df = df.with_column(
"zipcode",
F.compute_zipcode(df["lat"], df["long"])
)
return df
Snowflake SQL:
SELECT
COALESCE(foo, 0) AS foo,
compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Recursos por grupo¶
Os recursos por grupo agregam valores em uma coluna dentro de um grupo. Por exemplo, a soma da precipitação diária pode ser agrupada por cidade para previsão do tempo. O DataFrame de saída tem uma linha por grupo.
Python
def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.group_by(
["location", to_date(timestamp)]
).agg(
sum("rain").alias("sum_rain"),
avg("humidity").alias("avg_humidity")
)
return df
Snowflake SQL:
SELECT
location,
TO_DATE(timestamp) AS date,
SUM(rain) AS sum_rain,
AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Recursos de janela baseados em linha¶
A janela baseada em linha apresenta valores agregados em uma janela fixa de linhas; por exemplo, somando os três últimos valores de transação. A saída DataFrame tem uma linha por quadro de janela.
Python
def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)
return df.select(
sum("amount").over(window).alias("sum_past_3_transactions")
)
Snowflake SQL:
SELECT
id,
SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
AS sum_past_3_transactions
FROM <source_table_name>;
Recursos de agregação móvel¶
Os recursos de agregação móvel calculam estatísticas móveis, como soma e média, dentro de um tamanho de janela especificado. Esta função calcula dinamicamente esses agregados em diferentes subconjuntos do DataFrame com base nos tamanhos de janela, ordem e agrupamentos definidos. A saída DataFrame tem uma linha por quadro de janela.
new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT": ["SUM", "AVG"]},
window_sizes=[2, 3],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Recursos de agregação cumulativa¶
A agregação cumulativa calcula totais contínuos, mínimos, máximos e outras estatísticas cumulativas em uma partição de dados, que são classificadas e agrupadas conforme especificado. Diferentemente dos agregados móveis, esses totais se estendem do início da partição até o fim, dependendo da direção especificada, fornecendo totais contínuos que não são zerados. A saída DataFrame tem uma linha por linha de entrada.
new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"],
is_forward=True
)
Recursos de atraso¶
Os recursos de atraso introduzem novas colunas contendo valores de linhas anteriores dentro de cada partição, deslocadas por um número especificado de linhas. Esta função é essencial para comparar valores atuais com valores anteriores em um conjunto de dados, auxiliando assim na detecção de tendências ou mudanças ao longo do tempo. A saída DataFrame tem uma linha por linha de entrada.
new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT"],
lags=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Recursos de avanço¶
O inverso dos recursos de atraso, os recursos de avanço criam novas colunas contendo valores de linhas subsequentes, deslocando os dados para cima. Esse recurso é essencial para fazer previsões ou suposições com base em pontos de dados futuros já presentes em um conjunto de dados. A saída DataFrame tem uma linha por linha de entrada.
new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT"],
leads=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Recursos de séries temporais¶
Os recursos de séries temporais calculam valores de recursos com base em uma janela de tempo e uma posição fixa ao longo do eixo de tempo. Exemplos incluem a contagem de viagens na semana passada para serviços de transporte compartilhado ou a soma de vendas nos últimos três dias. A saída DataFrame tem uma linha por janela de tempo.
Versões recentes do Snowflake Feature Store incluem uma API de agregação experimental de séries temporais. Usando esta API, um recurso de série temporal pode ser criado usando código como o seguinte.
Python
def custom_column_naming(input_col, agg, window):
return f"{agg}_{input_col}_{window.replace('-', 'past_')}"
result_df = weather_df.analytics.time_series_agg(
aggs={"rain": ["SUM"]},
windows=["-3D", "-5D"],
sliding_interval="1D",
group_by=["location"],
time_col="ts",
col_formatter=custom_column_naming
)
Também é possível construir recursos de séries temporais com sintaxe RANGE BETWEEN em SQL. Para mais detalhes, consulte Funções de janela Snowflake.
Snowflake SQL:
select
TS,
LOCATION,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '3 days' preceding and current row
) SUM_RAIN_3D,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '5 days' preceding and current row
) SUM_RAIN_5D
from <source_table_name>
Como usar funções definidas pelo usuário em pipelines de recursos¶
O Snowflake Feature Store oferece suporte a funções definidas pelo usuário (UDFs) em definições de pipeline de recursos. Entretanto, apenas funções determinísticas (funções que sempre retornam o mesmo resultado para a mesma entrada) podem ser mantidas incrementalmente. Para habilitar a manutenção incremental, marque sua UDF como imutável ao registrá-la.
# In Python
@F.udf(
name="MY_UDF",
immutable=True,
# ...
)
def my_udf(...):
# ...
Se sua função for escrita em SQL, especifique a palavra-chave IMMUTABLE. Consulte este guia.