Feature Examples and Common Query Patterns

The FeatureView class accepts a Snowpark DataFrame object containing the feature transformation logic. You can therefore describe your features in any way supported by the Snowpark DataFrame API or by Snowflake SQL. You can pass the returned DataFrame to the FeatureView constructor directly.

The Snowpark Python API provides utility functions for easily defining many common feature types, such as windowed aggregations. This topic contains some examples of these.

Per-Row Features

In per-row features, functions are applied to each row of tabular data. For example, the following code fills null in foo with zero, then computes a ZIP code from lat and long. There is one output row per input row.

Python

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM table;
Copy

Per-Group Features

Per-group features aggregate values in a column within a group. For example, the sum of daily rainfall grouped by city for weather forecast. The output DataFrame has one row per group.

Python

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM weather
GROUP BY location, date;
Copy

Row-based Window Features

Row-based window features aggregate values over a fixed window of rows, for example summing up the past three transaction amounts. The output DataFrame has one output per window frame.

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM transactions;
Copy

Moving Aggregation Features

Moving aggregation features calculate moving statistics, such as sum and average, within a specified window size. This function dynamically computes these aggregates across different subsets of the DataFrame based on the defined window sizes, order, and groupings.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Cumulative Aggregation Features

Cumulative aggregation computes ongoing totals, minimums, maximums, and other cumulative statistics across a data partition, which is sorted and grouped as specified. Unlike moving aggregates, these totals extend from the start of the partition or to the end, depending on the direction specified, providing running totals that do not reset.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

Lag Features

Lag features introduce new columns containing values from prior rows within each partition, offset by a specified number of rows. This function is critical for comparing current values against previous values in a dataset, assisting in detecting trends or changes over time.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Lead Features

The inverse of lag features, lead features create new columns containing values from subsequent rows, shifting data upward. This feature is essential for making predictions or assumptions based on future data points already present in a dataset.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Time-series Features

Time-series features compute feature values based on a time window and a fixed position along the time axis. Examples include the count of trips over the past week for Uber rides or the sum of sales over the past three days.

Recent versions of the Snowflake Feature Store include an experimental time series aggregation API. Using this API, a time series feature can be created using code like the following.

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

You can also use the older, more stable APIs for time series features. (We plan to further improve the developer experience of authoring a time-series pipeline soon.)

Note

SQL is the simplest way to construct a time series pipeline in this fashion. You can use the SQL statement in the following Python example on its own.

def sum_timeseries_rainfall(session: snowpark.Session) -> snowpark.DataFrame:
    return session.sql(
"""
    with t as(
    SELECT
    ROW_NUMBER() OVER(
        ORDER BY location ASC
    ) row_num,
    weather.location,
    TO_DATE(weather.timestamp) as ts,
    weather.rain
    FROM
        weather
),
f_1 as(
    select distinct *
    from(
        select t1.location, t1.ts, sum(t2.rain) as three_day_sum_rain
        from t t1
        left outer join t t2
        on t1.location=t2.location and
        t2.ts>=t1.ts-2 and t2.ts<=t1.ts
        group by t1.row_num, t1.location, t1.ts
        order by t1.ts
    )
),
f_2 as(
    select distinct *
    from(
        select t1.location, t1.ts, sum(t2.rain) as five_day_sum_rain
        from t t1
        left outer join t t2
        on t1.location=t2.location and
        t2.ts>=t1.ts-4 and t2.ts<=t1.ts
        group by t1.row_num, t1.location, t1.ts
        order by t1.ts
    )
)
select f_1.*, f_2.five_day_sum_rain
from f_1 join f_2
on f_1.location=f_2.location and f_1.ts=f_2.ts
"""
    )
Copy

Using User-Defined Functions in Feature Pipelines

The Snowflake Feature Store supports user defined functions (UDFs) in feature pipeline definitions. However, only deterministic functions (functions that always return the same result for the same input) can be incrementally maintained. To enable incremental maintenance, mark your UDF as immutable when registering it.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

If your function is written in SQL, specify the IMMUTABLE keyword. See this guide.