Fonctionnalités communes et modèles de requête¶
Note
L’API Snowflake Feature Store est disponible dans le paquet Snowpark ML Python (snowflake-ml-python
) v1.5.0 et ultérieure.
La classe FeatureView
accepte un objet Snowpark DataFrame contenant la logique de transformation des fonctions. Vous pouvez donc décrire vos fonctions de n’importe quelle manière prise en charge par l’API de Snowpark DataFrame ou par du SQL Snowflake. Vous pouvez transmettre directement le DataFrame au constructeur FeatureView
.
L’API Snowpark Python fournit des fonctions analytiques permettant de définir facilement de nombreux types de fonctions courantes, telles que les agrégations fenêtrées. Cette rubrique en présente quelques exemples.
L’open source snowflake-ml-python sur Github contient également quelques exemples de définitions de fonctionnalités et d’entités utilisant des ensembles de données publics.
Fonctions par ligne¶
Dans les fonctions par ligne, les fonctions sont appliquées à chaque ligne de données tabulaires. Par exemple, le code suivant remplit les valeurs null dans foo
avec le chiffre zéro, puis calcule un code ZIP à partir de lat
et long
. Il y a une ligne de sortie par ligne d’entrée.
Python :
def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.fillna({"foo": 0})
df = df.with_column(
"zipcode",
F.compute_zipcode(df["lat"], df["long"])
)
return df
Snowflake SQL :
SELECT
COALESCE(foo, 0) AS foo,
compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Fonctions par groupe¶
Les fonctions par groupe agrègent les valeurs d’une colonne au sein d’un groupe. Par exemple, la somme des précipitations journalières pourrait être regroupée par ville pour les prévisions météorologiques. Le DataFrame en sortie comporte une ligne par groupe.
Python :
def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.group_by(
["location", to_date(timestamp)]
).agg(
sum("rain").alias("sum_rain"),
avg("humidity").alias("avg_humidity")
)
return df
Snowflake SQL :
SELECT
location,
TO_DATE(timestamp) AS date,
SUM(rain) AS sum_rain,
AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Fonctions des fenêtres basées sur les lignes¶
Les fonctions de fenêtre basées sur les lignes agrègent les valeurs sur une fenêtre fixe de lignes, par exemple en additionnant les montants des trois dernières transactions. Le DataFrame en sortie comporte une ligne par cadre de fenêtre.
Python :
def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)
return df.select(
sum("amount").over(window).alias("sum_past_3_transactions")
)
Snowflake SQL :
SELECT
id,
SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
AS sum_past_3_transactions
FROM <source_table_name>;
Fonctionnalités d’agrégation mobile¶
Les fonctions d’agrégation mobile permettent de calculer des statistiques mobiles, telles que la somme et la moyenne, à l’intérieur d’une fenêtre de taille spécifiée. Cette fonction calcule dynamiquement ces agrégats sur différents sous-ensembles du DataFrame en fonction de la taille des fenêtres, de l’ordre et des groupes définis. Le DataFrame en sortie comporte une ligne par cadre de fenêtre.
new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT": ["SUM", "AVG"]},
window_sizes=[2, 3],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Fonctions d’agrégation cumulative¶
L’agrégation cumulative calcule les totaux, les minimums, les maximums et autres statistiques cumulatives en cours sur une partition de données, qui est triée et groupée comme spécifié. Contrairement aux agrégats mobiles, ces totaux s’étendent du début de la partition ou jusqu’à la fin, selon la direction spécifiée, fournissant des totaux continus qui ne sont pas réinitialisés. Le DataFrame en sortie comporte une ligne par ligne d’entrée.
new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"],
is_forward=True
)
Fonctionnalités de latence¶
Les fonctions de latence introduisent de nouvelles colonnes contenant les valeurs des lignes précédentes au sein de chaque partition, décalées d’un nombre spécifié de lignes. Cette fonction est essentielle pour comparer les valeurs actuelles aux valeurs antérieures d’un ensemble de données, ce qui permet de détecter des tendances ou des changements au fil du temps. Le DataFrame en sortie comporte une ligne par ligne d’entrée.
new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT"],
lags=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Fonctionnalités d’avancement¶
L’inverse des fonctions de latence, les fonctions de d’avancement créent de nouvelles colonnes contenant les valeurs des lignes suivantes, décalant ainsi les données vers l’avant. Cette fonction est essentielle pour faire des prédictions ou des hypothèses basées sur des points de données futurs déjà présents dans un ensemble de données. Le DataFrame en sortie comporte une ligne par ligne d’entrée.
new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT"],
leads=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Fonctions des séries temporelles¶
Les fonctions de séries temporelles calculent les valeurs des fonctions en fonction d’une fenêtre temporelle et d’une position fixe le long de l’axe temporel. Il peut s’agir, par exemple, du nombre de trajets effectués au cours de la semaine écoulée pour des covoiturages ou de la somme des ventes réalisées au cours des trois derniers jours. Le DataFrame en sortie comporte une ligne par fenêtre de temps.
Les versions récentes du Snowflake Feature Store incluent une agrégation expérimentale d’API de séries temporelles. En utilisant cette API, une fonction de série temporelle peut être créée à l’aide d’un code comme le suivant :
Python :
def custom_column_naming(input_col, agg, window):
return f"{agg}_{input_col}_{window.replace('-', 'past_')}"
result_df = weather_df.analytics.time_series_agg(
aggs={"rain": ["SUM"]},
windows=["-3D", "-5D"],
sliding_interval="1D",
group_by=["location"],
time_col="ts",
col_formatter=custom_column_naming
)
Vous pouvez également créer des fonctionnalités de séries temporelles avec la syntaxe RANGE BETWEEN en SQL. Pour plus de détails, voir Fonctions de fenêtre Snowflake.
Snowflake SQL :
select
TS,
LOCATION,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '3 days' preceding and current row
) SUM_RAIN_3D,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '5 days' preceding and current row
) SUM_RAIN_5D
from <source_table_name>
Utilisation de fonctions définies par l’utilisateur dans les pipelines de fonctions¶
Le Snowflake Feature Store prend en charge les fonctions définies par l’utilisateur (UDFs) dans les définitions du pipeline de fonctions. Cependant, seules les fonctions déterministes (fonctions qui renvoient toujours le même résultat pour la même entrée) peuvent être maintenues de manière incrémentielle. Pour permettre une maintenance incrémentielle, indiquez que votre UDF est immuable lorsque vous l’enregistrez.
# In Python
@F.udf(
name="MY_UDF",
immutable=True,
# ...
)
def my_udf(...):
# ...
Si votre fonction est écrite en SQL, spécifiez le mot-clé IMMUTABLE. Voir ce guide.