snowflake.ml.feature_store.FeatureView¶
- class snowflake.ml.feature_store.FeatureView(name: str, entities: List[Entity], feature_df: DataFrame, *, timestamp_col: Optional[str] = None, refresh_freq: Optional[str] = None, desc: str = '', warehouse: Optional[str] = None, **_kwargs: Any)¶
Bases:
LineageNode
A FeatureView instance encapsulates a logical group of features.
Create a FeatureView instance.
- Parameters:
name – name of the FeatureView. NOTE: following Snowflake identifier rule
entities – entities that the FeatureView is associated with.
feature_df – Snowpark DataFrame containing data source and all feature feature_df logics. Final projection of the DataFrame should contain feature names, join keys and timestamp(if applicable).
timestamp_col – name of the timestamp column for point-in-time lookup when consuming the feature values.
refresh_freq –
Time unit defining how often the new feature data should be generated. Valid args are { <num> { seconds | minutes | hours | days } | DOWNSTREAM | <cron expr> <time zone>}. NOTE: Currently minimum refresh frequency is 1 minute. NOTE: If refresh_freq is in cron expression format, there must be a valid time zone as well.
E.g. * * * * * UTC
- NOTE: If refresh_freq is not provided, then FeatureView will be registered as View on Snowflake backend
and there won’t be extra storage cost.
desc – description of the FeatureView.
warehouse – warehouse to refresh feature view. Not needed for static feature view (refresh_freq is None). For managed feature view, this warehouse will overwrite the default warehouse of Feature Store if it is specified, otherwise the default warehouse will be used.
_kwargs – reserved kwargs for system generated args. NOTE: DO NOT USE.
Example:
>>> fs = FeatureStore(...) >>> # draft_fv is a local object that hasn't materiaized to Snowflake backend yet. >>> feature_df = session.sql("select f_1, f_2 from source_table") >>> draft_fv = FeatureView( ... name="my_fv", ... entities=[e1, e2], ... feature_df=feature_df, ... timestamp_col='TS', # optional ... refresh_freq='1d', # optional ... desc='A line about this feature view', # optional ... warehouse='WH' # optional, the warehouse used to refresh (managed) feature view ... ) >>> print(draft_fv.status) FeatureViewStatus.DRAFT >>> # registered_fv is a local object that maps to a Snowflake backend object. >>> registered_fv = fs.register_feature_view(draft_fv, "v1") >>> print(registered_fv.status) FeatureViewStatus.ACTIVE
Methods
- attach_feature_desc(descs: Dict[str, str]) FeatureView ¶
Associate feature level descriptions to the FeatureView.
- Parameters:
descs – Dictionary contains feature name and corresponding descriptions.
- Returns:
FeatureView with feature level desc attached.
- Raises:
ValueError – if feature name is not found in the FeatureView.
Example:
>>> fs = FeatureStore(...) >>> e = fs.get_entity('TRIP_ID') >>> feature_df = session.table(source_table).select('TRIPDURATION', 'START_STATION_LATITUDE', 'TRIP_ID') >>> draft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df) >>> draft_fv = draft_fv.attach_feature_desc({ ... "TRIPDURATION": "Duration of a trip.", ... "START_STATION_LATITUDE": "Latitude of the start station." ... }) >>> registered_fv = fs.register_feature_view(draft_fv, version='1.0') >>> registered_fv.feature_descs OrderedDict([('TRIPDURATION', 'Duration of a trip.'), ('START_STATION_LATITUDE', 'Latitude of the start station.')])
- classmethod from_json(json_str: str, session: Session) FeatureView ¶
- fully_qualified_name() str ¶
Returns the fully qualified name (<database_name>.<schema_name>.<feature_view_name>) for the FeatureView in Snowflake.
- Returns:
fully qualified name string.
- Raises:
RuntimeError – if the FeatureView is not registered.
Example:
>>> fs = FeatureStore(...) >>> e = fs.get_entity('TRIP_ID') >>> feature_df = session.table(source_table).select( ... 'TRIPDURATION', ... 'START_STATION_LATITUDE', ... 'TRIP_ID' ... ) >>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df) >>> registered_fv = fs.register_feature_view(darft_fv, version='1.0') >>> registered_fv.fully_qualified_name() 'MY_DB.MY_SCHEMA."F_TRIP$1.0"'
- lineage(direction: Literal['upstream', 'downstream'] = 'downstream', domain_filter: Optional[Set[Literal['feature_view', 'dataset', 'model', 'table', 'view']]] = None) List[Union[FeatureView, Dataset, ModelVersion, LineageNode]] ¶
Retrieves the lineage nodes connected to this node.
- Parameters:
direction – The direction to trace lineage. Defaults to “downstream”.
domain_filter – Set of domains to filter nodes. Defaults to None.
- Returns:
A list of connected lineage nodes.
- Return type:
List[LineageNode]
This function or method is in private preview since 1.5.3.
- list_columns() DataFrame ¶
List all columns and their information.
- Returns:
A Snowpark DataFrame contains feature information.
Example:
>>> fs = FeatureStore(...) >>> e = Entity("foo", ["id"], desc='my entity') >>> fs.register_entity(e) >>> draft_fv = FeatureView( ... name="fv", ... entities=[e], ... feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]), ... timestamp_col="ts", >>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'}) >>> fv = fs.register_feature_view(draft_fv, '1.0') >>> fv.list_columns().show() -------------------------------------------------- |"NAME" |"CATEGORY" |"DTYPE" |"DESC" | -------------------------------------------------- |NAME |FEATURE |string(64) | | |ID |ENTITY |bigint |my entity | |TITLE |FEATURE |string(128) |"my title" | |AGE |FEATURE |bigint |my age | |TS |TIMESTAMP |bigint |NULL | --------------------------------------------------
- slice(names: List[str]) FeatureViewSlice ¶
Select a subset of features within the FeatureView.
- Parameters:
names – feature names to select.
- Returns:
FeatureViewSlice instance containing selected features.
- Raises:
ValueError – if selected feature names is not found in the FeatureView.
Example:
>>> fs = FeatureStore(...) >>> e = fs.get_entity('TRIP_ID') >>> # feature_df contains 3 features and 1 entity >>> feature_df = session.table(source_table).select( ... 'TRIPDURATION', ... 'START_STATION_LATITUDE', ... 'END_STATION_LONGITUDE', ... 'TRIP_ID' ... ) >>> darft_fv = FeatureView(name='F_TRIP', entities=[e], feature_df=feature_df) >>> fv = fs.register_feature_view(darft_fv, version='1.0') >>> # shows all 3 features >>> fv.feature_names ['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE'] >>> # slice a subset of features >>> fv_slice = fv.slice(['TRIPDURATION', 'START_STATION_LATITUDE']) >>> fv_slice.names ['TRIPDURATION', 'START_STATION_LATITUDE'] >>> # query the full set of features in original feature view >>> fv_slice.feature_view_ref.feature_names ['TRIPDURATION', 'START_STATION_LATITUDE', 'END_STATION_LONGITUDE']
- to_df(session: Optional[Session] = None) DataFrame ¶
Convert feature view to a Snowpark DataFrame object.
- Parameters:
session – [deprecated] This argument has no effect. No need to pass a session object.
- Returns:
A Snowpark Dataframe object contains the information about feature view.
Example:
>>> fs = FeatureStore(...) >>> e = Entity("foo", ["id"], desc='my entity') >>> fs.register_entity(e) >>> draft_fv = FeatureView( ... name="fv", ... entities=[e], ... feature_df=self._session.table(<source_table>).select(["NAME", "ID", "TITLE", "AGE", "TS"]), ... timestamp_col="ts", >>> ).attach_feature_desc({"AGE": "my age", "TITLE": '"my title"'}) >>> fv = fs.register_feature_view(draft_fv, '1.0') fv.to_df().show() ----------------------------------------------------------------... |"NAME" |"ENTITIES" |"TIMESTAMP_COL" |"DESC" | ----------------------------------------------------------------... |FV |[ |TS |foobar | | | { | | | | | "desc": "my entity", | | | | | "join_keys": [ | | | | | "ID" | | | | | ], | | | | | "name": "FOO", | | | | | "owner": null | | | | | } | | | | |] | | | ----------------------------------------------------------------...
- to_json() str ¶
Attributes
- database¶
- desc¶
- entities¶
- feature_descs¶
- feature_df¶
- feature_names¶
- name¶
- output_schema¶
- owner¶
- query¶
- refresh_freq¶
- refresh_mode¶
- refresh_mode_reason¶
- schema¶
- status¶
- timestamp_col¶
- version¶
- warehouse¶