Gemeinsame Feature- und Abfrage-Muster¶
Bemerkung
Der Snowflake Feature Store API ist im Snowpark ML Python-Paket (snowflake-ml-python
) v1.5.0 und höher verfügbar.
Die Klasse FeatureView
akzeptiert ein Snowpark-DataFrame-Objekt, das die Logik zur Transformation des Features enthält. Sie können also Ihre Features auf jede Art und Weise beschreiben, die von der Snowpark-DataFrame-API oder von Snowflake SQL unterstützt wird. Sie können die DataFrame direkt an den FeatureView
-Konstruktor übergeben.
Snowpark Python-API bietet Analysefunktionen zur einfachen Definition vieler gängiger Feature-Typen, wie z. B. fensterbasierte Aggregationen. Dieses Thema enthält einige Beispiele dafür.
Die Open Source snowflake-ml-python auf Github enthält auch einige Beispiele für Feature-Ansichten und Definitionen von Entitäten unter Verwendung öffentlicher Datensätze.
Pro-Zeile-Features¶
Bei Pro-Zeile-Features werden die Funktionen auf jede Zeile der Datentabelle angewendet. Der folgende Code füllt zum Beispiel NULL in foo
mit 0 und berechnet dann einen ZIP-Code aus lat
und long
. Es gibt genau eine Ausgabezeile pro Eingabezeile.
Python:
def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.fillna({"foo": 0})
df = df.with_column(
"zipcode",
F.compute_zipcode(df["lat"], df["long"])
)
return df
Snowflake SQL
SELECT
COALESCE(foo, 0) AS foo,
compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Pro-Gruppe-Features¶
Pro-Gruppe-Features aggregieren die Werte einer Spalte innerhalb einer Gruppe, Zum Beispiel könnte die Summe der täglichen Niederschläge für die Wettervorhersage nach Städten gruppiert werden. Der Ausgabe-DataFrame hat eine Zeile pro Gruppe.
Python:
def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.group_by(
["location", to_date(timestamp)]
).agg(
sum("rain").alias("sum_rain"),
avg("humidity").alias("avg_humidity")
)
return df
Snowflake SQL
SELECT
location,
TO_DATE(timestamp) AS date,
SUM(rain) AS sum_rain,
AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Zeilenbasierte Fenster-Features¶
Zeilenbasierte Fenster-Features aggregieren Werte über ein festes Fenster von Zeilen, z. B. die Summe der letzten drei Transaktionsbeträge. Die Ausgabe DataFrame hat eine Zeile pro Fensterrahmen.
Python:
def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)
return df.select(
sum("amount").over(window).alias("sum_past_3_transactions")
)
Snowflake SQL
SELECT
id,
SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
AS sum_past_3_transactions
FROM <source_table_name>;
Features zu gleitenden Aggregationen¶
Features zu gleitenden Aggregationen berechnen gleitende Statistiken, wie Summe und Durchschnitt, innerhalb einer bestimmten Fenstergröße. Diese Funktion berechnet diese Aggregate dynamisch über verschiedene Teilmengen des DataFrame auf der Grundlage der definierten Fenstergrößen, der Reihenfolge und der Gruppierungen. Die Ausgabe DataFrame hat eine Zeile pro Fensterrahmen.
new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT": ["SUM", "AVG"]},
window_sizes=[2, 3],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Features zu kumulativen Aggregationen¶
Kumulative Aggregationen berechnen laufende Summen, Minima, Maxima und andere kumulative Statistiken über eine Datenpartition, die wie angegeben sortiert und gruppiert werden. Im Gegensatz zu gleitenden Aggregaten erstrecken sich diese Summen vom Beginn der Partition oder bis zum Ende, je nach der angegebenen Richtung, und liefern laufende Summen, die nicht zurückgesetzt werden. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.
new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"],
is_forward=True
)
Lag-Features¶
Lag-Features führen neue Spalten ein, die Werte aus früheren Zeilen innerhalb jeder Partition enthalten, versetzt um eine bestimmte Anzahl von Zeilen. Diese Funktion ist entscheidend für den Vergleich aktueller Werte mit früheren Werten in einem Datenset und hilft somit dabei, Trends oder Veränderungen im Zeitverlauf zu erkennen. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.
new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT"],
lags=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Lead-Features¶
Im Gegensatz zu den Lag-Features werden mit den Lead-Features neue Spalten erstellt, die Werte aus den nachfolgenden Zeilen enthalten und die Daten nach oben verschieben. Dieses Feature ist wichtig, um Vorhersagen oder Annahmen auf der Grundlage zukünftiger Datenpunkte zu treffen, die bereits in einem Datenset vorhanden sind. Die Ausgabe DataFrame hat eine Zeile pro Eingabezeile.
new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT"],
leads=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Zeitreihen-Features¶
Zeitreihen-Features berechnen Feature-Werte auf der Grundlage eines Zeitfensters und einer festen Position entlang der Zeitachse. Beispiele sind die Anzahl der Fahrten in der letzten Woche bei Mitfahrgelegenheiten oder die Summe der Verkäufe in den letzten drei Tagen. Die Ausgabe DataFrame hat eine Zeile pro Zeitfenster.
Aktuelle Versionen des Snowflake Feature Store enthalten eine experimentelle Zeitreihenaggregations-API. Mit dieser API können Sie ein Feature für eine Zeitreihe mit Code wie folgt erstellen.
Python:
def custom_column_naming(input_col, agg, window):
return f"{agg}_{input_col}_{window.replace('-', 'past_')}"
result_df = weather_df.analytics.time_series_agg(
aggs={"rain": ["SUM"]},
windows=["-3D", "-5D"],
sliding_interval="1D",
group_by=["location"],
time_col="ts",
col_formatter=custom_column_naming
)
Sie können Zeitreihen-Features auch mit der RANGE BETWEEN Syntax in SQL zusammenfassen. Weitere Einzelheiten finden Sie unter Snowflake Fensterfunktionen.
Snowflake SQL
select
TS,
LOCATION,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '3 days' preceding and current row
) SUM_RAIN_3D,
sum(RAIN) over (
partition by LOCATION
order by TS
range between interval '5 days' preceding and current row
) SUM_RAIN_5D
from <source_table_name>
Verwenden benutzerdefinierter Funktionen in Feature-Pipelines¶
Der Snowflake Feature Store unterstützt benutzerdefinierte Funktionen (UDFs) in Feature-Pipeline-Definitionen. Allerdings können nur deterministische Funktionen (Funktionen, die für dieselbe Eingabe immer dasselbe Ergebnis liefern) inkrementell gewartet werden. Um eine inkrementelle Wartung zu ermöglichen, markieren Sie Ihre UDF beim Registrieren als unveränderlich.
# In Python
@F.udf(
name="MY_UDF",
immutable=True,
# ...
)
def my_udf(...):
# ...
Wenn Ihre Funktion in SQL geschrieben ist, geben Sie das Schlüsselwort IMMUTABLE an. Weitere Informationen dazu finden Sie in dieser Anleitung.