Gemeinsame Feature- und Abfrage-Muster

Bemerkung

Der Snowflake Feature Store API ist im Snowpark ML Python-Paket (snowflake-ml-python) v1.5.0 und höher verfügbar.

Die Klasse FeatureView akzeptiert ein Snowpark-DataFrame-Objekt, das die Logik zur Transformation des Features enthält. Sie können also Ihre Features auf jede Art und Weise beschreiben, die von der Snowpark-DataFrame-API oder von Snowflake SQL unterstützt wird. Sie können die DataFrame direkt an den FeatureView-Konstruktor übergeben.

Snowpark Python-API bietet Analysefunktionen zur einfachen Definition vieler gängiger Feature-Typen, wie z. B. fensterbasierte Aggregationen. Dieses Thema enthält einige Beispiele dafür.

Die Open Source snowflake-ml-python auf Github enthält auch einige Beispiele für Feature-Ansichten und Definitionen von Entitäten unter Verwendung öffentlicher Datensätze.

Pro-Zeile-Features

Bei Pro-Zeile-Features werden die Funktionen auf jede Zeile der Datentabelle angewendet. Der folgende Code füllt zum Beispiel NULL in foo mit 0 und berechnet dann einen ZIP-Code aus lat und long. Es gibt genau eine Ausgabezeile pro Eingabezeile.

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

Pro-Gruppe-Features

Pro-Gruppe-Features aggregieren die Werte einer Spalte innerhalb einer Gruppe, Zum Beispiel könnte die Summe der täglichen Niederschläge für die Wettervorhersage nach Städten gruppiert werden. Der Ausgabe-DataFrame hat eine Zeile pro Gruppe.

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

Zeilenbasierte Fenster-Features

Zeilenbasierte Fenster-Features aggregieren Werte über ein festes Fenster von Zeilen, z. B. die Summe der letzten drei Transaktionsbeträge. Die Ausgabe DataFrame hat eine Zeile pro Fensterrahmen.

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

Features zu gleitenden Aggregationen

Features zu gleitenden Aggregationen berechnen gleitende Statistiken, wie Summe und Durchschnitt, innerhalb einer bestimmten Fenstergröße. Diese Funktion berechnet diese Aggregate dynamisch über verschiedene Teilmengen des DataFrame auf der Grundlage der definierten Fenstergrößen, der Reihenfolge und der Gruppierungen. Die Ausgabe DataFrame hat eine Zeile pro Fensterrahmen.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Features zu kumulativen Aggregationen

Kumulative Aggregationen berechnen laufende Summen, Minima, Maxima und andere kumulative Statistiken über eine Datenpartition, die wie angegeben sortiert und gruppiert werden. Im Gegensatz zu gleitenden Aggregaten erstrecken sich diese Summen vom Beginn der Partition oder bis zum Ende, je nach der angegebenen Richtung, und liefern laufende Summen, die nicht zurückgesetzt werden. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

Lag-Features

Lag-Features führen neue Spalten ein, die Werte aus früheren Zeilen innerhalb jeder Partition enthalten, versetzt um eine bestimmte Anzahl von Zeilen. Diese Funktion ist entscheidend für den Vergleich aktueller Werte mit früheren Werten in einem Datenset und hilft somit dabei, Trends oder Veränderungen im Zeitverlauf zu erkennen. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Lead-Features

Im Gegensatz zu den Lag-Features werden mit den Lead-Features neue Spalten erstellt, die Werte aus den nachfolgenden Zeilen enthalten und die Daten nach oben verschieben. Dieses Feature ist wichtig, um Vorhersagen oder Annahmen auf der Grundlage zukünftiger Datenpunkte zu treffen, die bereits in einem Datenset vorhanden sind. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Zeitreihen-Features

Zeitreihen-Features berechnen Feature-Werte auf der Grundlage eines Zeitfensters und einer festen Position entlang der Zeitachse. Beispiele sind die Anzahl der Fahrten in der letzten Woche bei Mitfahrgelegenheiten oder die Summe der Verkäufe in den letzten drei Tagen. Die Ausgabe DataFrame hat eine Zeile pro Zeitfenster.

Aktuelle Versionen des Snowflake Feature Store enthalten eine experimentelle Zeitreihenaggregations-API. Mit dieser API können Sie ein Feature für eine Zeitreihe mit Code wie folgt erstellen.

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

Sie können Zeitreihen-Features auch mit der RANGE BETWEEN Syntax in SQL zusammenfassen. Weitere Einzelheiten finden Sie unter Snowflake Fensterfunktionen.

Snowflake SQL

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

Verwenden benutzerdefinierter Funktionen in Feature-Pipelines

Der Snowflake Feature Store unterstützt benutzerdefinierte Funktionen (UDFs) in Feature-Pipeline-Definitionen. Allerdings können nur deterministische Funktionen (Funktionen, die für dieselbe Eingabe immer dasselbe Ergebnis liefern) inkrementell gewartet werden. Um eine inkrementelle Wartung zu ermöglichen, markieren Sie Ihre UDF beim Registrieren als unveränderlich.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

Wenn Ihre Funktion in SQL geschrieben ist, geben Sie das Schlüsselwort IMMUTABLE an. Weitere Informationen dazu finden Sie in dieser Anleitung.