You are viewing documentation about an older version (1.6.1). View latest version

snowflake.ml.feature_store.FeatureView

class snowflake.ml.feature_store.FeatureView(name: str, entities: List[Entity], feature_df: DataFrame, *, timestamp_col: Optional[str] = None, refresh_freq: Optional[str] = None, desc: str = '', warehouse: Optional[str] = None, **_kwargs: Any)

Bases: LineageNode

A FeatureView instance encapsulates a logical group of features.

Create a FeatureView instance.

Parameters:
  • name – name of the FeatureView. NOTE: following Snowflake identifier rule

  • entities – entities that the FeatureView is associated with.

  • feature_df – Snowpark DataFrame containing data source and all feature feature_df logics. Final projection of the DataFrame should contain feature names, join keys and timestamp(if applicable).

  • timestamp_col – name of the timestamp column for point-in-time lookup when consuming the feature values.

  • refresh_freq

    Time unit defining how often the new feature data should be generated. Valid args are { <num> { seconds | minutes | hours | days } | DOWNSTREAM | <cron expr> <time zone>}. NOTE: Currently minimum refresh frequency is 1 minute. NOTE: If refresh_freq is in cron expression format, there must be a valid time zone as well.

    E.g. * * * * * UTC

    NOTE: If refresh_freq is not provided, then FeatureView will be registered as View on Snowflake backend

    and there won’t be extra storage cost.

  • desc – description of the FeatureView.

  • warehouse – warehouse to refresh feature view. Not needed for static feature view (refresh_freq is None). For managed feature view, this warehouse will overwrite the default warehouse of Feature Store if it is specified, otherwise the default warehouse will be used.

  • _kwargs – reserved kwargs for system generated args. NOTE: DO NOT USE.

Example:

>>> fs = FeatureStore(...)
>>> # draft_fv is a local object that hasn't materiaized to Snowflake backend yet.
>>> feature_df = session.sql("select f_1, f_2 from source_table")
>>> draft_fv = FeatureView(
...     name="my_fv",
...     entities=[e1, e2],
...     feature_df=feature_df,
...     timestamp_col='TS', # optional
...     refresh_freq='1d',  # optional
...     desc='A line about this feature view',  # optional
...     warehouse='WH'      # optional, the warehouse used to refresh (managed) feature view
... )
>>> print(draft_fv.status)
FeatureViewStatus.DRAFT

>>> # registered_fv is a local object that maps to a Snowflake backend object.
>>> registered_fv = fs.register_feature_view(draft_fv, "v1")
>>> print(registered_fv.status)
FeatureViewStatus.ACTIVE
Copy

Methods

attach_feature_desc(descs: Dict[str, str]) FeatureView

Associate feature level descriptions to the FeatureView.

Parameters:

descs – Dictionary contains feature name and corresponding descriptions.

Returns:

FeatureView with feature level desc attached.

Raises:

ValueError – if feature name is not found in the FeatureView.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> feature_df = session.table(source_table).select('TRIPDURATION', 'START_STATION_LATITUDE', 'TRIP_ID')
>>> draft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> draft_fv = draft_fv.attach_feature_desc({
...     "TRIPDURATION": "Duration of a trip.",
...     "START_STATION_LATITUDE": "Latitude of the start station."
... })
>>> registered_fv = fs.register_feature_view(draft_fv, version='1.0')
>>> registered_fv.feature_descs
OrderedDict([('TRIPDURATION', 'Duration of a trip.'),
    ('START_STATION_LATITUDE', 'Latitude of the start station.')])
Copy
classmethod from_json(json_str: str, session: Session) FeatureView
fully_qualified_name() str

Returns the fully qualified name (<database_name>.<schema_name>.<feature_view_name>) for the FeatureView in Snowflake.

Returns:

fully qualified name string.

Raises:

RuntimeError – if the FeatureView is not registered.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> feature_df = session.table(source_table).select(
...     'TRIPDURATION',
...     'START_STATION_LATITUDE',
...     'TRIP_ID'
... )
>>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> registered_fv = fs.register_feature_view(darft_fv, version='1.0')
>>> registered_fv.fully_qualified_name()
'MY_DB.MY_SCHEMA."F_TRIP$1.0"'
Copy
lineage(direction: Literal['upstream', 'downstream'] = 'downstream', domain_filter: Optional[Set[Literal['feature_view', 'dataset', 'model', 'table', 'view']]] = None) List[Union[FeatureView, Dataset, ModelVersion, LineageNode]]

Retrieves the lineage nodes connected to this node.

Parameters:
  • direction – The direction to trace lineage. Defaults to “downstream”.

  • domain_filter – Set of domains to filter nodes. Defaults to None.

Returns:

A list of connected lineage nodes.

Return type:

List[LineageNode]

This function or method is in private preview since 1.5.3.

list_columns() DataFrame

List all columns and their information.

Returns:

A Snowpark DataFrame contains feature information.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity("foo", ["id"], desc='my entity')
>>> fs.register_entity(e)

>>> draft_fv = FeatureView(
...     name="fv",
...     entities=[e],
...     feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]),
...     timestamp_col="ts",
>>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'})
>>> fv = fs.register_feature_view(draft_fv, '1.0')

>>> fv.list_columns().show()
--------------------------------------------------
|"NAME"  |"CATEGORY"  |"DTYPE"      |"DESC"      |
--------------------------------------------------
|NAME    |FEATURE     |string(64)   |            |
|ID      |ENTITY      |bigint       |my entity   |
|TITLE   |FEATURE     |string(128)  |"my title"  |
|AGE     |FEATURE     |bigint       |my age      |
|TS      |TIMESTAMP   |bigint       |NULL        |
--------------------------------------------------
Copy
slice(names: List[str]) FeatureViewSlice

Select a subset of features within the FeatureView.

Parameters:

names – feature names to select.

Returns:

FeatureViewSlice instance containing selected features.

Raises:

ValueError – if selected feature names is not found in the FeatureView.

Example:

>>> fs = FeatureStore(...)
>>> e = fs.get_entity('TRIP_ID')
>>> # feature_df contains 3 features and 1 entity
>>> feature_df = session.table(source_table).select(
...     'TRIPDURATION',
...     'START_STATION_LATITUDE',
...     'END_STATION_LONGITUDE',
...     'TRIP_ID'
... )
>>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df)
>>> fv = fs.register_feature_view(darft_fv, version='1.0')
>>> # shows all 3 features
>>> fv.feature_names
['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE']

>>> # slice a subset of features
>>> fv_slice = fv.slice(['TRIPDURATION', 'START_STATION_LATITUDE'])
>>> fv_slice.names
['TRIPDURATION', 'START_STATION_LATITUDE']

>>> # query the full set of features in original feature view
>>> fv_slice.feature_view_ref.feature_names
['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE']
Copy
to_df(session: Optional[Session] = None) DataFrame

Convert feature view to a Snowpark DataFrame object.

Parameters:

session – [deprecated] This argument has no effect. No need to pass a session object.

Returns:

A Snowpark Dataframe object contains the information about feature view.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity("foo", ["id"], desc='my entity')
>>> fs.register_entity(e)

>>> draft_fv = FeatureView(
...     name="fv",
...     entities=[e],
...     feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]),
...     timestamp_col="ts",
>>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'})
>>> fv = fs.register_feature_view(draft_fv, '1.0')

fv.to_df().show()
----------------------------------------------------------------...
|"NAME"  |"ENTITIES"                |"TIMESTAMP_COL"  |"DESC"  |
----------------------------------------------------------------...
|FV      |[                         |TS               |foobar  |
|        |  {                       |                 |        |
|        |    "desc": "my entity",  |                 |        |
|        |    "join_keys": [        |                 |        |
|        |      "ID"                |                 |        |
|        |    ],                    |                 |        |
|        |    "name": "FOO",        |                 |        |
|        |    "owner": null         |                 |        |
|        |  }                       |                 |        |
|        |]                         |                 |        |
----------------------------------------------------------------...
Copy
to_json() str

Attributes

database
desc
entities
feature_descs
feature_df
feature_names
name
output_schema
owner
query
refresh_freq
refresh_mode
refresh_mode_reason
schema
status
timestamp_col
version
warehouse