snowflake.ml.feature_store.FeatureStore

class snowflake.ml.feature_store.FeatureStore(session: Session, database: str, name: str, default_warehouse: str, creation_mode: CreationMode = CreationMode.FAIL_IF_NOT_EXIST)

Bases: object

FeatureStore provides APIs to create, materialize, retrieve and manage feature pipelines.

Creates a FeatureStore instance.

Parameters:
  • session – Snowpark Session to interact with Snowflake backend.

  • database – Database to create the FeatureStore instance.

  • name – Target FeatureStore name, maps to a schema in the database.

  • default_warehouse – Default warehouse for feature store compute.

  • creation_mode – If FAIL_IF_NOT_EXIST, feature store throws when required resources not already exist; If CREATE_IF_NOT_EXIST, feature store will create required resources if they not already exist. Required resources include schema and tags. Note database must already exist in either mode.

Raises:
  • SnowflakeMLException – [ValueError] default_warehouse does not exist.

  • SnowflakeMLException – [ValueError] Required resources not exist when mode is FAIL_IF_NOT_EXIST.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

  • SnowflakeMLException – [RuntimeError] Failed to create feature store.

Methods

delete_entity(name: str) None

Delete a previously registered Entity.

Parameters:

name – Entity name.

Raises:
  • SnowflakeMLException – [ValueError] Entity with given name not exists.

  • SnowflakeMLException – [RuntimeError] Failed to alter schema or drop tag.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

delete_feature_view(feature_view: FeatureView) None
delete_feature_view(feature_view: str, version: str) None

Delete a FeatureView.

Parameters:
  • feature_view – FeatureView object or name to delete.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Raises:

SnowflakeMLException – [ValueError] FeatureView is not registered.

Example:

>>> fs = FeatureStore(...)
>>> fv = FeatureView('FV0', ...)
>>> fs.register_feature_view(fv, 'FIRST')
>>> fs.register_feature_view(fv, 'SECOND')
>>> fs.list_feature_views().select('NAME', 'VERSION').show()
Copy

|FV0 |SECOND | |FV0 |FIRST | ———————- >>> # with name and version >>> fs.delete_feature_view(‘FV0’, ‘FIRST’) >>> fs.list_feature_views().select(‘NAME’, ‘VERSION’).show() ———————- |”NAME” |”VERSION” | ———————- |FV0 |SECOND | ———————- >>> # with feature view object >>> fs.delete_feature_view(‘FV0’, ‘SECOND’) >>> fs.list_feature_views().select(‘NAME’, ‘VERSION’).show() ———————- |”NAME” |”VERSION” | ———————- | | | ———————-

generate_dataset(name: str, spine_df: DataFrame, features: List[Union[FeatureView, FeatureViewSlice]], version: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None, include_feature_view_timestamp_col: bool = False, desc: str = '', output_type: Literal['dataset'] = 'dataset') Dataset
generate_dataset(name: str, spine_df: DataFrame, features: List[Union[FeatureView, FeatureViewSlice]], output_type: Literal['table'], version: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None, include_feature_view_timestamp_col: bool = False, desc: str = '') DataFrame

Generate dataset by given source table and feature views.

Parameters:
  • name – The name of the Dataset to be generated. Datasets are uniquely identified within a schema by their name and version.

  • spine_df – Snowpark DataFrame to join features into.

  • features – A list of FeatureView or FeatureViewSlice which contains features to be joined.

  • version – The version of the Dataset to be generated. If none specified, the current timestamp will be used instead.

  • spine_timestamp_col – Name of timestamp column in spine_df that will be used to join time-series features. If spine_timestamp_col is not none, the input features also must have timestamp_col.

  • spine_label_cols – Name of column(s) in spine_df that contains labels.

  • exclude_columns – Name of column(s) to exclude from the resulting training set.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

  • desc – A description about this dataset.

  • output_type – (Deprecated) The type of Snowflake storage to use for the generated training data.

Returns:

If output_type is “dataset” (default), returns a Dataset object. If output_type is “table”, returns a Snowpark DataFrame representing the table.

Raises:
  • SnowflakeMLException – [ValueError] Invalid output_type specified.

  • SnowflakeMLException – [RuntimeError] Dataset name/version already exists.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

generate_training_set(spine_df: DataFrame, features: List[Union[FeatureView, FeatureViewSlice]], save_as: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[List[str]] = None, exclude_columns: Optional[List[str]] = None, include_feature_view_timestamp_col: bool = False) DataFrame

Generate a training set from the specified Spine DataFrame and Feature Views. Result is materialized to a Snowflake Table if save_as is specified.

Parameters:
  • spine_df – Snowpark DataFrame to join features into.

  • features – A list of FeatureView or FeatureViewSlice which contains features to be joined.

  • save_as – If specified, a new table containing the produced result will be created. Name can be a fully qualified name or an unqualified name. If unqualified, defaults to the Feature Store database and schema

  • spine_timestamp_col – Name of timestamp column in spine_df that will be used to join time-series features. If spine_timestamp_col is not none, the input features also must have timestamp_col.

  • spine_label_cols – Name of column(s) in spine_df that contains labels.

  • exclude_columns – Name of column(s) to exclude from the resulting training set.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

Returns:

Returns a Snowpark DataFrame representing the training set.

Raises:
  • SnowflakeMLException – [RuntimeError] Materialized table name already exists

  • SnowflakeMLException – [RuntimeError] Failed to create materialized table.

Example:

>>> fs = FeatureStore(session, ...)
>>> fv = fs.get_feature_view("MY_FV", "1")
>>> spine_df = session.create_dataframe(["id_1", "id_2"], schema=["id"])
>>> training_set = fs.generate_training_set(
...     spine_df,
...     [fv],
...     save_as="my_training_set",
... )
>>> print(type(training_set))
<class 'snowflake.snowpark.table.Table'>
>>> print(training_set.queries)
{'queries': ['SELECT  *  FROM (my_training_set)'], 'post_actions': []}
Copy
get_entity(name: str) Entity

Retrieve previously registered Entity object.

Parameters:

name – Entity name.

Returns:

Entity object.

Raises:
  • SnowflakeMLException – [ValueError] Entity is not found.

  • SnowflakeMLException – [RuntimeError] Failed to retrieve tag reference information.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

get_feature_view(name: str, version: str) FeatureView

Retrieve previously registered FeatureView.

Parameters:
  • name – FeatureView name.

  • version – FeatureView version.

Returns:

FeatureView object.

Raises:

SnowflakeMLException – [ValueError] FeatureView with name and version is not found, or incurred exception when reconstructing the FeatureView object.

get_refresh_history(feature_view: FeatureView, version: Optional[str] = None, verbose: bool = False) DataFrame
get_refresh_history(feature_view: str, version: str, verbose: bool = False) DataFrame

Get refresh hisotry statistics about a feature view.

Parameters:
  • feature_view – A registered feature view object, or the name of feature view.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

  • verbose – Return more detailed history when set true.

Returns:

A dataframe contains the refresh history information.

Example:

>>> fs = FeatureStore(...)
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> # with name and version
>>> fs.refresh_feature_view('MY_FV', 'v1')
>>> fs.get_refresh_history('MY_FV', 'v1').show()
Copy
>>> # with feature view object
>>> fs.refresh_feature_view(fv)
>>> fs.get_refresh_history(fv).show()
-----------------------------------------------------------------------------------------------------
|"NAME"    |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:54:06.680000  |2024-07-10 14:54:07.226000  |INCREMENTAL       |
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:53:58.504000  |2024-07-10 14:53:59.088000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------
Copy
list_entities() DataFrame

List all Entities in the FeatureStore.

Returns:

Snowpark DataFrame containing the results.

list_feature_views(entity_name: Optional[str] = None, feature_view_name: Optional[str] = None) DataFrame

List FeatureViews in the FeatureStore. If entity_name is specified, FeatureViews associated with that Entity will be listed. If feature_view_name is specified, further reducing the results to only match the specified name.

Parameters:
  • entity_name – Entity name.

  • feature_view_name – FeatureView name.

Returns:

FeatureViews information as a Snowpark DataFrame.

load_feature_views_from_dataset(ds: Dataset) List[Union[FeatureView, FeatureViewSlice]]

Retrieve FeatureViews used during Dataset construction.

Parameters:

ds – Dataset object created from feature store.

Returns:

List of FeatureViews used during Dataset construction.

Raises:

ValueError – if dataset object is not generated from feature store.

read_feature_view(feature_view: str, version: str) DataFrame
read_feature_view(feature_view: FeatureView) DataFrame

Read values from a FeatureView.

Parameters:
  • feature_view – A FeatureView object to read from, or the name of feature view. If name is provided then version also must be provided.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Returns:

Snowpark DataFrame(lazy mode) containing the FeatureView data.

Raises:
  • SnowflakeMLException – [ValueError] version argument is missing when argument feature_view is a str.

  • SnowflakeMLException – [ValueError] FeatureView is not registered.

Example:

>>> fs = FeatureStore(...)
>>> # Assume you already have some feature views registered.
>>> fv = fs.get_feature_view('foo', 'v1')
>>> # Read from feature view name and version.
>>> fs.read_feature_view('foo', 'v1').show()
Copy

|jonh |1 |boss |20 |100 | |porter |2 |manager |30 |200 | —————————————— >>> # Read from feature view object. >>> fs.read_feature_view(fv).show() —————————————— |”NAME” |”ID” |”TITLE” |”AGE” |”TS” | —————————————— |jonh |1 |boss |20 |100 | |porter |2 |manager |30 |200 | ——————————————

refresh_feature_view(feature_view: FeatureView) None
refresh_feature_view(feature_view: str, version: str) None

Manually refresh a feature view.

Parameters:
  • feature_view – A registered feature view object, or the name of feature view.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Example:

>>> fs = FeatureStore(...)
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> # with name and version
>>> fs.refresh_feature_view('MY_FV', 'v1')
>>> fs.get_refresh_history('MY_FV', 'v1').show()
Copy
>>> # with feature view object
>>> fs.refresh_feature_view(fv)
>>> fs.get_refresh_history(fv).show()
-----------------------------------------------------------------------------------------------------
|"NAME"    |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:54:06.680000  |2024-07-10 14:54:07.226000  |INCREMENTAL       |
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:53:58.504000  |2024-07-10 14:53:59.088000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------
Copy
register_entity(entity: Entity) Entity

Register Entity in the FeatureStore.

Parameters:

entity – Entity object to register.

Returns:

A registered entity object.

Raises:

SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity('BAR', ['A'], 'entity bar')
>>> fs.register_entity(e)
>>> fs.list_entities().show()
Copy
register_feature_view(feature_view: FeatureView, version: str, block: bool = True, overwrite: bool = False) FeatureView

Materialize a FeatureView to Snowflake backend. Incremental maintenance for updates on the source data will be automated if refresh_freq is set. NOTE: Each new materialization will trigger a full FeatureView history refresh for the data included in the

FeatureView.

Parameters:
  • feature_view – FeatureView instance to materialize.

  • version – version of the registered FeatureView. NOTE: Version only accepts letters, numbers and underscore. Also version will be capitalized.

  • block – Specify whether the FeatureView backend materialization should be blocking or not. If blocking then the API will wait until the initial FeatureView data is generated. Default to true.

  • overwrite – Overwrite the existing FeatureView with same version. This is the same as dropping the FeatureView first then recreate. NOTE: there will be backfill cost associated if the FeatureView is being continuously maintained.

Returns:

A materialized FeatureView object.

Raises:
  • SnowflakeMLException – [ValueError] FeatureView entity has not been registered.

  • SnowflakeMLException – [ValueError] Warehouse or default warehouse is not specified.

  • SnowflakeMLException – [RuntimeError] Failed to create dynamic table, task, or view.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> draft_fv = FeatureView(name="my_fv", entities=[entities], feature_df)
>>> registered_fv = fs.register_feature_view(feature_view=draft_fv, version="v1")
...
Copy
resume_feature_view(feature_view: FeatureView) FeatureView
resume_feature_view(feature_view: str, version: str) FeatureView

Resume a previously suspended FeatureView.

Parameters:
  • feature_view – FeatureView object or name to resume.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Returns:

A new feature view with updated status.

Example:

>>> fs = FeatureStore(...)
>>> # you must already have feature views registered
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> fs.suspend_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
Copy
>>> fs.resume_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |ACTIVE              |
-------------------------------------------
Copy
retrieve_feature_values(spine_df: DataFrame, features: Union[List[Union[FeatureView, FeatureViewSlice]], List[str]], spine_timestamp_col: Optional[str] = None, exclude_columns: Optional[List[str]] = None, include_feature_view_timestamp_col: bool = False) DataFrame

Enrich spine dataframe with feature values. Mainly used to generate inference data input. If spine_timestamp_col is specified, point-in-time feature values will be fetched.

Parameters:
  • spine_df – Snowpark DataFrame to join features into.

  • features – List of features to join into the spine_df. Can be a list of FeatureView or FeatureViewSlice, or a list of serialized feature objects from Dataset.

  • spine_timestamp_col – Timestamp column in spine_df for point-in-time feature value lookup.

  • exclude_columns – Column names to exclude from the result dataframe.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

Returns:

Snowpark DataFrame containing the joined results.

Raises:

ValueError – if features is empty.

suspend_feature_view(feature_view: FeatureView) FeatureView
suspend_feature_view(feature_view: str, version: str) FeatureView

Suspend an active FeatureView.

Parameters:
  • feature_view – FeatureView object or name to suspend.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Returns:

A new feature view with updated status.

Example:

>>> fs = FeatureStore(...)
>>> # assume you already have feature views registered
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> fs.suspend_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
Copy
>>> fs.resume_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |ACTIVE              |
-------------------------------------------
Copy
update_default_warehouse(warehouse_name: str) None

Update default warehouse for feature store.

Parameters:

warehouse_name – Name of warehouse.

Raises:

SnowflakeMLException – If warehouse does not exists.

update_entity(name: str, desc: Optional[str] = None) Optional[Entity]

Update a registered entity with provided information.

Parameters:
  • name – Name of entity to update.

  • desc – Optional new description to apply. Default to None.

Raises:

SnowflakeMLException – Error happen when updating.

Returns:

A new entity with updated information or None if the entity doesn’t exist.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity(name='foo', join_keys=['COL_1'], desc='old desc')
>>> fs.list_entities().show()
------------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"    |"OWNER"     |
------------------------------------------------
|FOO     |["COL_1"]    |old desc  |REGTEST_RL  |
------------------------------------------------
>>> fs.update_entity('foo', desc='NEW DESC')
>>> fs.list_entities().show()
------------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"    |"OWNER"     |
------------------------------------------------
|FOO     |["COL_1"]    |NEW DESC  |REGTEST_RL  |
------------------------------------------------
Copy
update_feature_view(name: str, version: str, refresh_freq: Optional[str] = None, warehouse: Optional[str] = None, desc: Optional[str] = None) FeatureView
Update a registered feature view.

Check feature_view.py for which fields are allowed to be updated after registration.

Parameters:
  • name – name of the FeatureView to be updated.

  • version – version of the FeatureView to be updated.

  • refresh_freq – updated refresh frequency.

  • warehouse – updated warehouse.

  • desc – description of feature view.

Returns:

Updated FeatureView.

Example:

>>> fs = FeatureStore(
...     ...,
...     default_warehouse='ORIGINAL_WH',
... )
>>> fv = FeatureView(
...     name='foo',
...     entities=[e1, e2],
...     feature_df=session.sql('...'),
...     timestamp_col='timestamp',
...     refresh_freq='1d',
...     desc='this is old description'
... )
>>> fv = fs.register_feature_view(feature_view=fv, version='v1')
>>> # update_feature_view will apply new arguments to the registered feature view.
>>> new_fv = fs.update_feature_view(
...     name='foo',
...     version='v1',
...     refresh_freq='2d',
...     warehouse='MY_NEW_WH',
...     desc='that is new descption',
... )
Copy
Raises:
  • SnowflakeMLException – [RuntimeError] If FeatureView is not managed and refresh_freq is defined.

  • SnowflakeMLException – [RuntimeError] Failed to update feature view.