Feature Examples and Common Query Patterns¶
The FeatureView
class accepts a Snowpark DataFrame object containing the feature transformation logic. You can
therefore describe your features in any way supported by the Snowpark DataFrame API or by Snowflake SQL. You can pass
the returned DataFrame to the FeatureView
constructor directly.
The Snowpark Python API provides utility functions for easily defining many common feature types, such as windowed aggregations. This topic contains some examples of these.
Per-Row Features¶
In per-row features, functions are applied to each row of tabular data. For example, the following code fills null in
foo
with zero, then computes a ZIP code from lat
and long
. There is one output row per input
row.
Python¶
def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.fillna({"foo": 0})
df = df.with_column(
"zipcode",
F.compute_zipcode(df["lat"], df["long"])
)
return df
Snowflake SQL¶
SELECT
COALESCE(foo, 0) AS foo,
compute_zipcode(lat, long) AS zipcode
FROM table;
Per-Group Features¶
Per-group features aggregate values in a column within a group. For example, the sum of daily rainfall grouped by city for weather forecast. The output DataFrame has one row per group.
Python¶
def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
df = df.group_by(
["location", to_date(timestamp)]
).agg(
sum("rain").alias("sum_rain"),
avg("humidity").alias("avg_humidity")
)
return df
Snowflake SQL¶
SELECT
location,
TO_DATE(timestamp) AS date,
SUM(rain) AS sum_rain,
AVG(humidity) AS avg_humidity
FROM weather
GROUP BY location, date;
Row-based Window Features¶
Row-based window features aggregate values over a fixed window of rows, for example summing up the past three transaction amounts. The output DataFrame has one output per window frame.
def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)
return df.select(
sum("amount").over(window).alias("sum_past_3_transactions")
)
Snowflake SQL¶
SELECT
id,
SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
AS sum_past_3_transactions
FROM transactions;
Moving Aggregation Features¶
Moving aggregation features calculate moving statistics, such as sum and average, within a specified window size. This function dynamically computes these aggregates across different subsets of the DataFrame based on the defined window sizes, order, and groupings.
new_df = df.analytics.moving_agg(
aggs={"SALESAMOUNT": ["SUM", "AVG"]},
window_sizes=[2, 3],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Cumulative Aggregation Features¶
Cumulative aggregation computes ongoing totals, minimums, maximums, and other cumulative statistics across a data partition, which is sorted and grouped as specified. Unlike moving aggregates, these totals extend from the start of the partition or to the end, depending on the direction specified, providing running totals that do not reset.
new_df = df.analytics.cumulative_agg(
aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"],
is_forward=True
)
Lag Features¶
Lag features introduce new columns containing values from prior rows within each partition, offset by a specified number of rows. This function is critical for comparing current values against previous values in a dataset, assisting in detecting trends or changes over time.
new_df = df.analytics.compute_lag(
cols=["SALESAMOUNT"],
lags=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Lead Features¶
The inverse of lag features, lead features create new columns containing values from subsequent rows, shifting data upward. This feature is essential for making predictions or assumptions based on future data points already present in a dataset.
new_df = df.analytics.compute_lead(
cols=["SALESAMOUNT"],
leads=[1, 2],
order_by=["ORDERDATE"],
group_by=["PRODUCTKEY"]
)
Time-series Features¶
Time-series features compute feature values based on a time window and a fixed position along the time axis. Examples include the count of trips over the past week for Uber rides or the sum of sales over the past three days.
Recent versions of the Snowflake Feature Store include an experimental time series aggregation API. Using this API, a time series feature can be created using code like the following.
def custom_column_naming(input_col, agg, window):
return f"{agg}_{input_col}_{window.replace('-', 'past_')}"
result_df = weather_df.analytics.time_series_agg(
aggs={"rain": ["SUM"]},
windows=["-3D", "-5D"],
sliding_interval="1D",
group_by=["location"],
time_col="ts",
col_formatter=custom_column_naming
)
You can also use the older, more stable APIs for time series features. (We plan to further improve the developer experience of authoring a time-series pipeline soon.)
Note
SQL is the simplest way to construct a time series pipeline in this fashion. You can use the SQL statement in the following Python example on its own.
def sum_timeseries_rainfall(session: snowpark.Session) -> snowpark.DataFrame:
return session.sql(
"""
with t as(
SELECT
ROW_NUMBER() OVER(
ORDER BY location ASC
) row_num,
weather.location,
TO_DATE(weather.timestamp) as ts,
weather.rain
FROM
weather
),
f_1 as(
select distinct *
from(
select t1.location, t1.ts, sum(t2.rain) as three_day_sum_rain
from t t1
left outer join t t2
on t1.location=t2.location and
t2.ts>=t1.ts-2 and t2.ts<=t1.ts
group by t1.row_num, t1.location, t1.ts
order by t1.ts
)
),
f_2 as(
select distinct *
from(
select t1.location, t1.ts, sum(t2.rain) as five_day_sum_rain
from t t1
left outer join t t2
on t1.location=t2.location and
t2.ts>=t1.ts-4 and t2.ts<=t1.ts
group by t1.row_num, t1.location, t1.ts
order by t1.ts
)
)
select f_1.*, f_2.five_day_sum_rain
from f_1 join f_2
on f_1.location=f_2.location and f_1.ts=f_2.ts
"""
)
Using User-Defined Functions in Feature Pipelines¶
The Snowflake Feature Store supports user defined functions (UDFs) in feature pipeline definitions. However, only deterministic functions (functions that always return the same result for the same input) can be incrementally maintained. To enable incremental maintenance, mark your UDF as immutable when registering it.
# In Python
@F.udf(
name="MY_UDF",
immutable=True,
# ...
)
def my_udf(...):
# ...
If your function is written in SQL, specify the IMMUTABLE keyword. See this guide.