snowflake.ml.feature_store.FeatureView

class snowflake.ml.feature_store.FeatureView(name: str, entities: List[Entity], feature_df: DataFrame, *, timestamp_col: Optional[str] = None, refresh_freq: Optional[str] = None, desc: str = '', warehouse: Optional[str] = None, initialize: str = 'ON_CREATE', refresh_mode: str = 'AUTO', **_kwargs: Any)

Bases: LineageNode

A FeatureView instance encapsulates a logical group of features.

Create a FeatureView instance.

Parameters:
  • name – name of the FeatureView. NOTE: following Snowflake identifier rule

  • entities – entities that the FeatureView is associated with.

  • feature_df – Snowpark DataFrame containing data source and all feature feature_df logics. Final projection of the DataFrame should contain feature names, join keys and timestamp(if applicable).

  • timestamp_col – name of the timestamp column for point-in-time lookup when consuming the feature values.

  • refresh_freq

    Time unit defining how often the new feature data should be generated. Valid args are { <num> { seconds | minutes | hours | days } | DOWNSTREAM | <cron expr> <time zone>}. NOTE: Currently minimum refresh frequency is 1 minute. NOTE: If refresh_freq is in cron expression format, there must be a valid time zone as well.

    E.g. * * * * * UTC

    NOTE: If refresh_freq is not provided, then FeatureView will be registered as View on Snowflake backend

    and there won’t be extra storage cost.

  • desc – description of the FeatureView.

  • warehouse – warehouse to refresh feature view. Not needed for static feature view (refresh_freq is None). For managed feature view, this warehouse will overwrite the default warehouse of Feature Store if it is specified, otherwise the default warehouse will be used.

  • initialize – Specifies the behavior of the initial refresh of feature view. This property cannot be altered after you register the feature view. It supports ON_CREATE (default) or ON_SCHEDULE. ON_CREATE refreshes the feature view synchronously at creation. ON_SCHEDULE refreshes the feature view at the next scheduled refresh. It is only effective when refresh_freq is not None.

  • refresh_mode – The refresh mode of managed feature view. The value can be ‘AUTO’, ‘FULL’ or ‘INCREMENETAL’. For managed feature view, the default value is ‘AUTO’. For static feature view it has no effect. Check https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table for for details.

  • _kwargs – reserved kwargs for system generated args. NOTE: DO NOT USE.

Example:

>>> fs = FeatureStore(...)
>>> # draft_fv is a local object that hasn't materiaized to Snowflake backend yet.
>>> feature_df = session.sql("select f_1, f_2 from source_table")
>>> draft_fv = FeatureView(
...     name="my_fv",
...     entities=[e1, e2],
...     feature_df=feature_df,
...     timestamp_col='TS', # optional
...     refresh_freq='1d',  # optional
...     desc='A line about this feature view',  # optional
...     warehouse='WH'      # optional, the warehouse used to refresh (managed) feature view
... )
>>> print(draft_fv.status)
FeatureViewStatus.DRAFT

>>> # registered_fv is a local object that maps to a Snowflake backend object.
>>> registered_fv = fs.register_feature_view(draft_fv, "v1")
>>> print(registered_fv.status)
FeatureViewStatus.ACTIVE
Copy

Methods

attach_feature_desc(descs: Dict[str, str]) FeatureView

Associate feature level descriptions to the FeatureView.

Parameters:

descs – Dictionary contains feature name and corresponding descriptions.

Returns:

FeatureView with feature level desc attached.

Raises:

ValueError – if feature name is not found in the FeatureView.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> feature_df = session.table(source_table).select('TRIPDURATION', 'START_STATION_LATITUDE', 'TRIP_ID')
>>> draft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> draft_fv = draft_fv.attach_feature_desc({
...     "TRIPDURATION": "Duration of a trip.",
...     "START_STATION_LATITUDE": "Latitude of the start station."
... })
>>> registered_fv = fs.register_feature_view(draft_fv, version='1.0')
>>> registered_fv.feature_descs
OrderedDict([('TRIPDURATION', 'Duration of a trip.'),
    ('START_STATION_LATITUDE', 'Latitude of the start station.')])
Copy
classmethod from_json(json_str: str, session: Session) FeatureView
fully_qualified_name() str

Returns the fully qualified name (<database_name>.<schema_name>.<feature_view_name>) for the FeatureView in Snowflake.

Returns:

fully qualified name string.

Raises:

RuntimeError – if the FeatureView is not registered.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> feature_df = session.table(source_table).select(
...     'TRIPDURATION',
...     'START_STATION_LATITUDE',
...     'TRIP_ID'
... )
>>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> registered_fv = fs.register_feature_view(darft_fv, version='1.0')
>>> registered_fv.fully_qualified_name()
'MY_DB.MY_SCHEMA."F_TRIP$1.0"'
Copy
lineage(direction: Literal['upstream', 'downstream'] = 'downstream', domain_filter: Optional[Set[Literal['feature_view', 'dataset', 'model', 'table', 'view']]] = None) List[Union[FeatureView, Dataset, ModelVersion, LineageNode]]

Retrieves the lineage nodes connected to this node.

Parameters:
  • direction – The direction to trace lineage. Defaults to “downstream”.

  • domain_filter – Set of domains to filter nodes. Defaults to None.

Returns:

A list of connected lineage nodes.

Return type:

List[LineageNode]

This function or method is in private preview since 1.5.3.

list_columns() DataFrame

List all columns and their information.

Returns:

A Snowpark DataFrame contains feature information.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity("foo", ["id"], desc='my entity')
>>> fs.register_entity(e)

>>> draft_fv = FeatureView(
...     name="fv",
...     entities=[e],
...     feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]),
...     timestamp_col="ts",
>>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'})
>>> fv = fs.register_feature_view(draft_fv, '1.0')

>>> fv.list_columns().show()
--------------------------------------------------
|"NAME"  |"CATEGORY"  |"DTYPE"      |"DESC"      |
--------------------------------------------------
|NAME    |FEATURE     |string(64)   |            |
|ID      |ENTITY      |bigint       |my entity   |
|TITLE   |FEATURE     |string(128)  |"my title"  |
|AGE     |FEATURE     |bigint       |my age      |
|TS      |TIMESTAMP   |bigint       |NULL        |
--------------------------------------------------
Copy
slice(names: List[str]) FeatureViewSlice

Select a subset of features within the FeatureView.

Parameters:

names – feature names to select.

Returns:

FeatureViewSlice instance containing selected features.

Raises:

ValueError – if selected feature names is not found in the FeatureView.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> # feature_df contains 3 features and 1 entity
>>> feature_df = session.table(source_table).select(
...     'TRIPDURATION',
...     'START_STATION_LATITUDE',
...     'END_STATION_LONGITUDE',
...     'TRIP_ID'
... )
>>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> fv = fs.register_feature_view(darft_fv, version='1.0')
>>> # shows all 3 features
>>> fv.feature_names
['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE']

>>> # slice a subset of features
>>> fv_slice = fv.slice(['TRIPDURATION', 'START_STATION_LATITUDE'])
>>> fv_slice.names
['TRIPDURATION', 'START_STATION_LATITUDE']

>>> # query the full set of features in original feature view
>>> fv_slice.feature_view_ref.feature_names
['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE']
Copy
to_df(session: Optional[Session] = None) DataFrame

Convert feature view to a Snowpark DataFrame object.

Parameters:

session – [deprecated] This argument has no effect. No need to pass a session object.

Returns:

A Snowpark Dataframe object contains the information about feature view.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity("foo", ["id"], desc='my entity')
>>> fs.register_entity(e)

>>> draft_fv = FeatureView(
...     name="fv",
...     entities=[e],
...     feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]),
...     timestamp_col="ts",
>>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'})
>>> fv = fs.register_feature_view(draft_fv, '1.0')

>>> fv.to_df().show()
----------------------------------------------------------------...
|"NAME"  |"ENTITIES"                |"TIMESTAMP_COL"  |"DESC"  |
----------------------------------------------------------------...
|FV      |[                         |TS               |foobar  |
|        |  {                       |                 |        |
|        |    "desc": "my entity",  |                 |        |
|        |    "join_keys": [        |                 |        |
|        |      "ID"                |                 |        |
|        |    ],                    |                 |        |
|        |    "name": "FOO",        |                 |        |
|        |    "owner": null         |                 |        |
|        |  }                       |                 |        |
|        |]                         |                 |        |
----------------------------------------------------------------...
Copy
to_json() str

Attributes

database
desc
entities
feature_descs
feature_df
feature_names
initialize
name
output_schema
owner
query
refresh_freq
refresh_mode
refresh_mode_reason
schema
status
timestamp_col
version
warehouse