Common feature and query patterns

The FeatureView class accepts a Snowpark DataFrame object containing the feature transformation logic. You can therefore describe your features in any way supported by the Snowpark DataFrame API or by Snowflake SQL. You can pass the returned DataFrame to the FeatureView constructor directly.

The Snowpark Python API provides Analytics Functions for easily defining many common feature types, such as windowed aggregations. This topic contains some examples of these.

The open source snowflake-ml-python on Github also contains some sample feature view and entity defintions using public datasets.

Per-Row Features

In per-row features, functions are applied to each row of tabular data. For example, the following code fills null in foo with zero, then computes a ZIP code from lat and long. There is one output row per input row.

Python:

def get_zipcode(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.fillna({"foo": 0})
    df = df.with_column(
        "zipcode",
        F.compute_zipcode(df["lat"], df["long"])
    )
    return df
Copy

Snowflake SQL:

SELECT
    COALESCE(foo, 0) AS foo,
    compute_zipcode(lat, long) AS zipcode
FROM <source_table_name>;
Copy

Per-Group Features

Per-group features aggregate values in a column within a group. For example, the sum of daily rainfall grouped by city for weather forecast. The output DataFrame has one row per group.

Python:

def sum_rainfall(df: snowpark.DataFrame) -> snowpark.DataFrame:
    df = df.group_by(
        ["location", to_date(timestamp)]
    ).agg(
        sum("rain").alias("sum_rain"),
        avg("humidity").alias("avg_humidity")
    )
    return df
Copy

Snowflake SQL:

SELECT
    location,
    TO_DATE(timestamp) AS date,
    SUM(rain) AS sum_rain,
    AVG(humidity) AS avg_humidity
FROM <source_table_name>
GROUP BY location, date;
Copy

Row-based Window Features

Row-based window features aggregate values over a fixed window of rows: for example, summing the last three transaction amounts. The output DataFrame has one row per window frame.

Python:

def sum_past_3_transactions(df: snowpark.DataFrame) -> snowpark.DataFrame:
    window = Window.partition_by("id").order_by("ts").rows_between(2, Window.CURRENT_ROW)

    return df.select(
        sum("amount").over(window).alias("sum_past_3_transactions")
    )
Copy

Snowflake SQL:

SELECT
    id,
    SUM(amount) OVER (PARTITION BY id ORDER BY ts ROWS BETWEEN 2 PRECEDING and 0 FOLLOWING)
        AS sum_past_3_transactions
FROM <source_table_name>;
Copy

Moving Aggregation Features

Moving aggregation features calculate moving statistics, such as sum and average, within a specified window size. This function dynamically computes these aggregates across different subsets of the DataFrame based on the defined window sizes, order, and groupings.

new_df =  df.analytics.moving_agg(
    aggs={"SALESAMOUNT": ["SUM", "AVG"]},
    window_sizes=[2, 3],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Cumulative Aggregation Features

Cumulative aggregation computes ongoing totals, minimums, maximums, and other cumulative statistics across a data partition, which is sorted and grouped as specified. Unlike moving aggregates, these totals extend from the start of the partition or to the end, depending on the direction specified, providing running totals that do not reset.

 new_df = df.analytics.cumulative_agg(
    aggs={"SALESAMOUNT": ["SUM", "MIN", "MAX"]},
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"],
    is_forward=True
)
Copy

Lag Features

Lag features introduce new columns containing values from prior rows within each partition, offset by a specified number of rows. This function is critical for comparing current values against previous values in a dataset, assisting in detecting trends or changes over time.

new_df = df.analytics.compute_lag(
    cols=["SALESAMOUNT"],
    lags=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Lead Features

The inverse of lag features, lead features create new columns containing values from subsequent rows, shifting data upward. This feature is essential for making predictions or assumptions based on future data points already present in a dataset.

new_df = df.analytics.compute_lead(
    cols=["SALESAMOUNT"],
    leads=[1, 2],
    order_by=["ORDERDATE"],
    group_by=["PRODUCTKEY"]
)
Copy

Time-series Features

Time-series features compute feature values based on a time window and a fixed position along the time axis. Examples include the count of trips over the past week for Uber rides or the sum of sales over the past three days.

Recent versions of the Snowflake Feature Store include an experimental time series aggregation API. Using this API, a time series feature can be created using code like the following.

Python:

def custom_column_naming(input_col, agg, window):
    return f"{agg}_{input_col}_{window.replace('-', 'past_')}"

result_df = weather_df.analytics.time_series_agg(
    aggs={"rain": ["SUM"]},
    windows=["-3D", "-5D"],
    sliding_interval="1D",
    group_by=["location"],
    time_col="ts",
    col_formatter=custom_column_naming
)
Copy

You can also construct time-series features with RANGE BETWEEN syntax in SQL. Refer to Snowflake Window functions for more details.

Snowflake SQL:

select
    TS,
    LOCATION,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '3 days' preceding and current row
    ) SUM_RAIN_3D,
    sum(RAIN) over (
        partition by LOCATION
        order by TS
        range between interval '5 days' preceding and current row
    ) SUM_RAIN_5D
from <source_table_name>
Copy

Using User-Defined Functions in Feature Pipelines

The Snowflake Feature Store supports user defined functions (UDFs) in feature pipeline definitions. However, only deterministic functions (functions that always return the same result for the same input) can be incrementally maintained. To enable incremental maintenance, mark your UDF as immutable when registering it.

# In Python
@F.udf(
    name="MY_UDF",
    immutable=True,
    # ...
)
def my_udf(...):
    # ...
Copy

If your function is written in SQL, specify the IMMUTABLE keyword. See this guide.