Fonctionnalités communes et modèles de requête

Note

L’API Snowflake Feature Store est disponible dans le paquet Snowpark ML Python (snowflake-ml-python) v1.5.0 et ultérieure.

La classe FeatureView accepte un objet Snowpark DataFrame contenant la logique de transformation des fonctions. Vous pouvez donc décrire vos fonctions de n’importe quelle manière prise en charge par l’API de Snowpark DataFrame ou par du SQL Snowflake. Vous pouvez transmettre directement le DataFrame au constructeur FeatureView.

L’API Snowpark Python fournit des fonctions analytiques permettant de définir facilement de nombreux types de fonctions courantes, telles que les agrégations fenêtrées. Cette rubrique en présente quelques exemples.

L’open source snowflake-ml-python sur Github contient également quelques exemples de définitions de fonctionnalités et d’entités utilisant des ensembles de données publics.

Fonctions par ligne

Dans les fonctions par ligne, les fonctions sont appliquées à chaque ligne de données tabulaires. Par exemple, le code suivant remplit les valeurs null dans foo avec le chiffre zéro, puis calcule un code ZIP à partir de lat et long. Il y a une ligne de sortie par ligne d’entrée.

Python :

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL :

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

Fonctions par groupe

Les fonctions par groupe agrègent les valeurs d’une colonne au sein d’un groupe. Par exemple, la somme des précipitations journalières pourrait être regroupée par ville pour les prévisions météorologiques. Le DataFrame en sortie comporte une ligne par groupe.

Python :

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL :

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

Fonctions des fenêtres basées sur les lignes

Les fonctions de fenêtre basées sur les lignes agrègent les valeurs sur une fenêtre fixe de lignes, par exemple en additionnant les montants des trois dernières transactions. Le DataFrame en sortie comporte une ligne par cadre de fenêtre.

Python :

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL :

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

Fonctionnalités d’agrégation mobile

Les fonctions d’agrégation mobile permettent de calculer des statistiques mobiles, telles que la somme et la moyenne, à l’intérieur d’une fenêtre de taille spécifiée. Cette fonction calcule dynamiquement ces agrégats sur différents sous-ensembles du DataFrame en fonction de la taille des fenêtres, de l’ordre et des groupes définis. Le DataFrame en sortie comporte une ligne par cadre de fenêtre.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Fonctions d’agrégation cumulative

L’agrégation cumulative calcule les totaux, les minimums, les maximums et autres statistiques cumulatives en cours sur une partition de données, qui est triée et groupée comme spécifié. Contrairement aux agrégats mobiles, ces totaux s’étendent du début de la partition ou jusqu’à la fin, selon la direction spécifiée, fournissant des totaux continus qui ne sont pas réinitialisés. Le DataFrame en sortie comporte une ligne par ligne d’entrée.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

Fonctionnalités de latence

Les fonctions de latence introduisent de nouvelles colonnes contenant les valeurs des lignes précédentes au sein de chaque partition, décalées d’un nombre spécifié de lignes. Cette fonction est essentielle pour comparer les valeurs actuelles aux valeurs antérieures d’un ensemble de données, ce qui permet de détecter des tendances ou des changements au fil du temps. Le DataFrame en sortie comporte une ligne par ligne d’entrée.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Fonctionnalités d’avancement

L’inverse des fonctions de latence, les fonctions de d’avancement créent de nouvelles colonnes contenant les valeurs des lignes suivantes, décalant ainsi les données vers l’avant. Cette fonction est essentielle pour faire des prédictions ou des hypothèses basées sur des points de données futurs déjà présents dans un ensemble de données. Le DataFrame en sortie comporte une ligne par ligne d’entrée.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Fonctions des séries temporelles

Les fonctions de séries temporelles calculent les valeurs des fonctions en fonction d’une fenêtre temporelle et d’une position fixe le long de l’axe temporel. Il peut s’agir, par exemple, du nombre de trajets effectués au cours de la semaine écoulée pour des covoiturages ou de la somme des ventes réalisées au cours des trois derniers jours. Le DataFrame en sortie comporte une ligne par fenêtre de temps.

Les versions récentes du Snowflake Feature Store incluent une agrégation expérimentale d’API de séries temporelles. En utilisant cette API, une fonction de série temporelle peut être créée à l’aide d’un code comme le suivant :

Python :

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

Vous pouvez également créer des fonctionnalités de séries temporelles avec la syntaxe RANGE BETWEEN en SQL. Pour plus de détails, voir Fonctions de fenêtre Snowflake.

Snowflake SQL :

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

Utilisation de fonctions définies par l’utilisateur dans les pipelines de fonctions

Le Snowflake Feature Store prend en charge les fonctions définies par l’utilisateur (UDFs) dans les définitions du pipeline de fonctions. Cependant, seules les fonctions déterministes (fonctions qui renvoient toujours le même résultat pour la même entrée) peuvent être maintenues de manière incrémentielle. Pour permettre une maintenance incrémentielle, indiquez que votre UDF est immuable lorsque vous l’enregistrez.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

Si votre fonction est écrite en SQL, spécifiez le mot-clé IMMUTABLE. Voir ce guide.