snowflake.ml.feature_store.FeatureStore

class snowflake.ml.feature_store.FeatureStore(session: Session, database: str, name: str, default_warehouse: str, *, creation_mode: CreationOption = CreationOption.FAIL_IF_NOT_EXIST, default_iceberg_external_volume: Optional[str] = None)

Bases: object

FeatureStore provides APIs to create, materialize, retrieve and manage feature pipelines.

Creates a FeatureStore instance.

Parameters:
  • session – Snowpark Session to interact with Snowflake backend.

  • database – Database to create the FeatureStore instance.

  • name – Target FeatureStore name, maps to a schema in the database.

  • default_warehouse – Default warehouse for feature store compute.

  • creation_mode – If FAIL_IF_NOT_EXIST, feature store throws when required resources not already exist; If CREATE_IF_NOT_EXIST, feature store will create required resources if they not already exist. Required resources include schema and tags. Note database must already exist in either mode.

  • default_iceberg_external_volume – Default external volume for Iceberg-backed Feature Views. If set, Feature Views using StorageFormat.ICEBERG can omit external_volume in their StorageConfig.

Raises:
  • SnowflakeMLException – [ValueError] default_warehouse does not exist.

  • SnowflakeMLException – [ValueError] Required resources not exist when mode is FAIL_IF_NOT_EXIST.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

  • SnowflakeMLException – [RuntimeError] Failed to create feature store.

Example:

>>> from snowflake.ml.feature_store import (
...     FeatureStore,
...     CreationMode,
... )

>>> # Create a new Feature Store:
>>> fs = FeatureStore(
...     session=session,
...     database="MYDB",
...     name="MYSCHEMA",
...     default_warehouse="MYWH",
...     creation_mode=CreationMode.CREATE_IF_NOT_EXIST
... )

>>> # Connect to an existing Feature Store:
>>> fs = FeatureStore(
...     session=session,
...     database="MYDB",
...     name="MYSCHEMA",
...     default_warehouse="MYWH",
...     creation_mode=CreationMode.FAIL_IF_NOT_EXIST
... )

Methods

close() None

Release per-FeatureStore resources (currently: the Online Service HTTP pool). Idempotent.

create_online_service(producer_role: str, consumer_role: str) OnlineServiceResult

Create the Online Service for this store’s schema.

Requires schema OWNERSHIP and account feature enablement. After SUCCESS, poll get_online_service_status() until RUNNING before using online feature reads or stream ingestion.

Parameters:
  • producer_role – Role name granted producer privileges on the Online Service.

  • consumer_role – Role name granted consumer privileges on the Online Service.

Returns:

Parsed create result from the Online Service.

delete_entity(name: str) None

Delete a previously registered Entity.

Parameters:

name – Name of entity to be deleted.

Raises:
  • SnowflakeMLException – [ValueError] Entity with given name not exists.

  • SnowflakeMLException – [RuntimeError] Failed to alter schema or drop tag.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> fs = FeatureStore(...)
>>> e_1 = Entity("my_entity", ['col_1'], desc='My first entity.')
>>> fs.register_entity(e_1)
>>> fs.list_entities().show()
-----------------------------------------------------------
|"NAME"     |"JOIN_KEYS"  |"DESC"            |"OWNER"     |
-----------------------------------------------------------
|MY_ENTITY  |["COL_1"]    |My first entity.  |REGTEST_RL  |
-----------------------------------------------------------

>>> fs.delete_entity("my_entity")
>>> fs.list_entities().show()
-------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"  |"OWNER"  |
-------------------------------------------
|        |             |        |         |
-------------------------------------------
delete_feature_view(feature_view: FeatureView) None
delete_feature_view(feature_view: str, version: str) None

Delete a FeatureView.

Parameters:
  • feature_view – FeatureView object or name to delete.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Raises:

SnowflakeMLException – [ValueError] FeatureView is not registered.

Example:

>>> fs = FeatureStore(...)
>>> fv = FeatureView('FV0', ...)
>>> fv1 = fs.register_feature_view(fv, 'FIRST')
>>> fv2 = fs.register_feature_view(fv, 'SECOND')
>>> fs.list_feature_views().select('NAME', 'VERSION').show()
----------------------
|"NAME"  |"VERSION"  |
----------------------
|FV0     |SECOND     |
|FV0     |FIRST      |
----------------------

>>> # delete with name and version
>>> fs.delete_feature_view('FV0', 'FIRST')
>>> fs.list_feature_views().select('NAME', 'VERSION').show()
----------------------
|"NAME"  |"VERSION"  |
----------------------
|FV0     |SECOND     |
----------------------

>>> # delete with feature view object
>>> fs.delete_feature_view(fv2)
>>> fs.list_feature_views().select('NAME', 'VERSION').show()
----------------------
|"NAME"  |"VERSION"  |
----------------------
|        |           |
----------------------
delete_stream_source(name: str) None

Delete a previously registered StreamSource.

The StreamSource can only be deleted if no active FeatureViews reference it.

Parameters:

name – Name of the StreamSource to be deleted.

Raises:
  • SnowflakeMLException – [ValueError] StreamSource with given name not found.

  • SnowflakeMLException – [ValueError] Cannot delete due to active references.

  • SnowflakeMLException – [RuntimeError] Failed to delete stream source.

Example:

>>> fs = FeatureStore(...)
>>> fs.delete_stream_source("transaction_events")
drop_online_service() OnlineServiceResult

Drop the Online Service.

Stops the Online Service and releases its resources. Online feature reads and stream ingestion will no longer be available after this call.

Returns:

OnlineServiceResult with status and message.

generate_dataset(name: str, spine_df: DataFrame, features: list[Union[FeatureView, FeatureViewSlice]], *, version: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[list[str]] = None, exclude_columns: Optional[list[str]] = None, include_feature_view_timestamp_col: bool = False, auto_prefix: bool = False, desc: str = '', output_type: Literal['dataset'] = 'dataset', join_method: Literal['sequential', 'cte'] = 'sequential') Dataset
generate_dataset(name: str, spine_df: DataFrame, features: list[Union[FeatureView, FeatureViewSlice]], *, output_type: Literal['table'], version: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[list[str]] = None, exclude_columns: Optional[list[str]] = None, include_feature_view_timestamp_col: bool = False, auto_prefix: bool = False, desc: str = '', join_method: Literal['sequential', 'cte'] = 'sequential') DataFrame

Generate dataset by given source table and feature views.

Parameters:
  • name – The name of the Dataset to be generated. Datasets are uniquely identified within a schema by their name and version.

  • spine_df – Snowpark DataFrame to join features into.

  • features – A list of FeatureView or FeatureViewSlice which contains features to be joined.

  • version – The version of the Dataset to be generated. If none specified, the current timestamp will be used instead.

  • spine_timestamp_col – Name of timestamp column in spine_df that will be used to join time-series features. If spine_timestamp_col is not none, the input features also must have timestamp_col.

  • spine_label_cols – Name of column(s) in spine_df that contains labels.

  • exclude_columns – Name of column(s) to exclude from the resulting training set.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

  • auto_prefix – If True, automatically prefix all feature columns with ‘{feature_view_name}_{version}_’. Default False. Use FeatureView.with_name() for custom prefixes.

  • desc – A description about this dataset.

  • output_type – (Deprecated) The type of Snowflake storage to use for the generated training data.

  • join_method – Method for feature joins. “sequential” for layer-by-layer joins (default), “cte” for CTE method. (Internal use only - subject to change)

Returns:

If output_type is “dataset” (default), returns a Dataset object. If output_type is “table”, returns a Snowpark DataFrame representing the table.

Raises:
  • SnowflakeMLException – [ValueError] Invalid output_type specified.

  • SnowflakeMLException – [RuntimeError] Dataset name/version already exists.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> fs = FeatureStore(session, ...)
>>> # Assume you already have feature view registered.
>>> fv = fs.get_feature_view("MY_FV", "1")
>>> # Spine dataframe has same join keys as the entity of fv.
>>> spine_df = session.create_dataframe(["1", "2"], schema=["id"])
>>> my_dataset = fs.generate_dataset(
...     "my_dataset"
...     spine_df,
...     [fv],
... )
>>> # Current timestamp will be used as default version name.
>>> # You can explicitly overwrite by setting a version.
>>> my_dataset.list_versions()
['2024_07_12_11_26_22']

>>> my_dataset.read.to_snowpark_dataframe().show(n=3)
-------------------------------------------------------
|"QUALITY"  |"FIXED_ACIDITY"     |"VOLATILE_ACIDITY"  |
-------------------------------------------------------
|3          |11.600000381469727  |0.5799999833106995  |
|3          |8.300000190734863   |1.0199999809265137  |
|3          |7.400000095367432   |1.184999942779541   |
-------------------------------------------------------
generate_training_set(spine_df: DataFrame, features: list[Union[FeatureView, snowflake.ml.feature_store.feature_view.FeatureViewSlice]], *, save_as: Optional[str] = None, spine_timestamp_col: Optional[str] = None, spine_label_cols: Optional[list[str]] = None, exclude_columns: Optional[list[str]] = None, include_feature_view_timestamp_col: bool = False, auto_prefix: bool = False, join_method: Literal['sequential', 'cte'] = 'sequential') DataFrame

Generate a training set from the specified Spine DataFrame and Feature Views.

For rollup feature views with temporal entity mappings (mapping_valid_from_col and mapping_valid_to_col), this method uses point-in-time correct entity resolution via a range JOIN with bounded validity windows [valid_from, valid_to), supporting 1:N mappings (one child entity to multiple parent entities simultaneously). For inference with latest mappings, use retrieve_feature_values instead.

Result is materialized to a Snowflake Table if save_as is specified.

Parameters:
  • spine_df – Snowpark DataFrame to join features into.

  • features – A list of FeatureView or FeatureViewSlice which contains features to be joined.

  • save_as – If specified, a new table containing the produced result will be created. Name can be a fully qualified name or an unqualified name. If unqualified, defaults to the Feature Store database and schema

  • spine_timestamp_col – Name of timestamp column in spine_df that will be used to join time-series features. If spine_timestamp_col is not none, the input features also must have timestamp_col.

  • spine_label_cols – Name of column(s) in spine_df that contains labels.

  • exclude_columns – Name of column(s) to exclude from the resulting training set.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

  • auto_prefix – If True, automatically prefix all feature columns with ‘{feature_view_name}_{version}_’. Default False. Use FeatureView.with_name() for custom prefixes.

  • join_method – Method for feature joins. “sequential” for layer-by-layer joins (default), “cte” for CTE method. (Internal use only - subject to change)

Returns:

Returns a Snowpark DataFrame representing the training set.

Raises:
  • SnowflakeMLException – [RuntimeError] Materialized table name already exists

  • SnowflakeMLException – [RuntimeError] Failed to create materialized table.

Example:

>>> fs = FeatureStore(session, ...)
>>> # Assume you already have feature view registered.
>>> fv = fs.get_feature_view("MY_FV", "1")
>>> # Spine dataframe has same join keys as the entity of fv.
>>> spine_df = session.create_dataframe(["1", "2"], schema=["id"])
>>> training_set = fs.generate_training_set(
...     spine_df,
...     [fv],
...     save_as="my_training_set",
... )
>>> print(type(training_set))
<class 'snowflake.snowpark.table.Table'>

>>> print(training_set.queries)
{'queries': ['SELECT  *  FROM (my_training_set)'], 'post_actions': []}
get_entity(name: str) Entity

Retrieve previously registered Entity object.

Parameters:

name – Entity name.

Returns:

Entity object.

Raises:
  • SnowflakeMLException – [ValueError] Entity is not found.

  • SnowflakeMLException – [RuntimeError] Failed to retrieve tag reference information.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> fs = FeatureStore(...)
>>> # e_1 is a local object that hasn't registered to Snowflake backend yet.
>>> e_1 = Entity("my_entity", ['col_1'], desc='My first entity.')
>>> fs.register_entity(e_1)

>>> # e_2 is a local object that points a backend object in Snowflake.
>>> e_2 = fs.get_entity("my_entity")
>>> print(e_2)
Entity(name=MY_ENTITY, join_keys=['COL_1'], owner=REGTEST_RL, desc=My first entity.)
get_feature_view(name: str, version: str) FeatureView

Retrieve previously registered FeatureView.

Parameters:
  • name – FeatureView name.

  • version – FeatureView version.

Returns:

FeatureView object.

Raises:

SnowflakeMLException – [ValueError] FeatureView with name and version is not found, or incurred exception when reconstructing the FeatureView object.

Example:

>>> fs = FeatureStore(...)
>>> # draft_fv is a local object that hasn't materialized to Snowflake backend yet.
>>> draft_fv = FeatureView(
...     name='foo',
...     entities=[e1],
...     feature_df=session.sql('...'),
...     desc='this is description',
... )
>>> fs.register_feature_view(feature_view=draft_fv, version='v1')

>>> # fv is a local object that maps to a Snowflake backend object.
>>> fv = fs.get_feature_view('foo', 'v1')
>>> print(f"name: {fv.name}")
>>> print(f"version:{fv.version}")
>>> print(f"desc:{fv.desc}")
name: FOO
version:v1
desc:this is description
get_online_service_status() OnlineServiceStatus

Return Online Service status.

Poll this method after create_online_service() until status is RUNNING before using online feature reads or stream ingestion.

Returns:

OnlineServiceStatus with status, message, and endpoints.

get_refresh_history(feature_view: FeatureView, version: Optional[str] = None, *, verbose: bool = False, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE) DataFrame
get_refresh_history(feature_view: str, version: str, *, verbose: bool = False, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE) DataFrame

Get refresh history statistics about a feature view.

Parameters:
  • feature_view – A registered feature view object, or the name of feature view.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

  • verbose – Return more detailed history when set true.

  • store_type

    Store to get refresh history from - StoreType.ONLINE or StoreType.OFFLINE (default). - StoreType.OFFLINE (default): Returns refresh history for the offline feature view (dynamic table). - StoreType.ONLINE: Returns refresh history for the online feature table.

    Only available for feature views with online=True.

Returns:

A dataframe contains the refresh history information.

Raises:

SnowflakeMLException – [ValueError] If store_type is ONLINE but feature view doesn’t have online storage enabled.

Example:

>>> fs = FeatureStore(...)
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> # Get offline refresh history (default)
>>> fs.refresh_feature_view('MY_FV', 'v1')
>>> fs.get_refresh_history('MY_FV', 'v1').show()
-----------------------------------------------------------------------------------------------------
|"NAME"    |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:53:58.504000  |2024-07-10 14:53:59.088000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------

>>> # Get online refresh history (for feature views with online storage)
>>> fs.get_refresh_history('MY_FV', 'v1', store_type=StoreType.ONLINE).show()
-----------------------------------------------------------------------------------------------------
|"NAME"          |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1$ONLINE |SUCCEEDED  |2024-07-10 14:54:01.200000  |2024-07-10 14:54:02.100000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------

>>> # Verbose mode works for both storage types
>>> fs.get_refresh_history(fv, verbose=True, store_type=StoreType.OFFLINE).show()
>>> fs.get_refresh_history(fv, verbose=True, store_type=StoreType.ONLINE).show()
get_stream_source(name: str) StreamSource

Retrieve a previously registered StreamSource object.

Parameters:

name – StreamSource name.

Returns:

StreamSource object.

Raises:
  • SnowflakeMLException – [ValueError] StreamSource is not found.

  • SnowflakeMLException – [RuntimeError] Failed to retrieve stream source metadata.

Example:

>>> fs = FeatureStore(...)
>>> ss = fs.get_stream_source("transaction_events")
>>> print(ss)
StreamSource(name=TRANSACTION_EVENTS, ...)
list_entities() DataFrame

List all Entities in the FeatureStore.

Returns:

Snowpark DataFrame containing the results.

Example:

>>> fs = FeatureStore(...)
>>> e_1 = Entity("my_entity", ['col_1'], desc='My first entity.')
>>> fs.register_entity(e_1)
>>> fs.list_entities().show()
-----------------------------------------------------------
|"NAME"     |"JOIN_KEYS"  |"DESC"            |"OWNER"     |
-----------------------------------------------------------
|MY_ENTITY  |["COL_1"]    |My first entity.  |REGTEST_RL  |
-----------------------------------------------------------
list_feature_views(*, entity_name: Optional[str] = None, feature_view_name: Optional[str] = None) DataFrame

List FeatureViews in the FeatureStore. If entity_name is specified, FeatureViews associated with that Entity will be listed. If feature_view_name is specified, further reducing the results to only match the specified name.

Parameters:
  • entity_name – Entity name.

  • feature_view_name – FeatureView name.

Returns:

FeatureViews information as a Snowpark DataFrame.

Example:

>>> fs = FeatureStore(...)
>>> draft_fv = FeatureView(
...     name='foo',
...     entities=[e1, e2],
...     feature_df=session.sql('...'),
...     desc='this is description',
... )
>>> fs.register_feature_view(feature_view=draft_fv, version='v1')
>>> fs.list_feature_views().select("name", "version", "desc").show()
--------------------------------------------
|"NAME"  |"VERSION"  |"DESC"               |
--------------------------------------------
|FOO     |v1         |this is description  |
--------------------------------------------
list_stream_sources() DataFrame

List all StreamSources in the FeatureStore.

Returns:

NAME, SCHEMA, DESC, OWNER.

Return type:

Snowpark DataFrame containing the results with columns

Raises:

SnowflakeMLException – If the metadata query fails.

Example:

>>> fs = FeatureStore(...)
>>> fs.list_stream_sources().show()
-------------------------------------------------------------------------------------...
|"NAME"                |"SCHEMA"  |"DESC"                    |"OWNER"     |
-------------------------------------------------------------------------------------...
|TRANSACTION_EVENTS    |[...]     |Real-time transaction...  |REGTEST_RL  |
-------------------------------------------------------------------------------------...
load_feature_views_from_dataset(ds: Dataset) list[Union[FeatureView, snowflake.ml.feature_store.feature_view.FeatureViewSlice]]

Retrieve FeatureViews used during Dataset construction.

Parameters:

ds – Dataset object created from feature store.

Returns:

List of FeatureViews used during Dataset construction.

Raises:

ValueError – if dataset object is not generated from feature store.

Example:

>>> fs = FeatureStore(session, ...)
>>> # Assume you already have feature view registered.
>>> fv = fs.get_feature_view("MY_FV", "1.0")
>>> # Spine dataframe has same join keys as the entity of fv.
>>> spine_df = session.create_dataframe(["1", "2"], schema=["id"])
>>> my_dataset = fs.generate_dataset(
...     "my_dataset"
...     spine_df,
...     [fv],
... )
>>> fvs = fs.load_feature_views_from_dataset(my_dataset)
>>> print(len(fvs))
1

>>> print(type(fvs[0]))
<class 'snowflake.ml.feature_store.feature_view.FeatureView'>

>>> print(fvs[0].name)
MY_FV

>>> print(fvs[0].version)
1.0
read_feature_view(feature_view: str, version: str, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: Literal[False]) DataFrame
read_feature_view(feature_view: str, version: str, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: Literal[True]) pd.DataFrame
read_feature_view(feature_view: str, version: str, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: None = None) Union[DataFrame, pd.DataFrame]
read_feature_view(feature_view: FeatureView, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: Literal[False]) DataFrame
read_feature_view(feature_view: FeatureView, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: Literal[True]) pd.DataFrame
read_feature_view(feature_view: FeatureView, *, keys: Optional[list[list[str]]] = None, feature_names: Optional[list[str]] = None, store_type: Union[StoreType, str] = fv_mod.StoreType.OFFLINE, use_session_warehouse: bool = False, as_pandas: None = None) Union[DataFrame, pd.DataFrame]

Read values from a FeatureView from either offline or online store.

Parameters:
  • feature_view – A FeatureView object to read from, or the name of feature view. If name is provided then version also must be provided.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

  • keys – Optional list of primary key value lists to filter by. Each inner list should contain values in the same order as the entity join_keys. Works for both offline and online stores. Example: [[“user1”], [“user2”]] for single key, [[“user1”, “item1”], [“user2”, “item2”]] for composite keys. If None, returns all data (hybrid online tables only). For Postgres-backed online feature views, keys must be non-empty; the Online Service Query API does not support unbounded scans from SnowML.

  • feature_names – Optional list of feature names to return. If None, returns all features. For Postgres online reads, a non-empty list is sent to the Query API as JSON features so the service can return only those columns; the DataFrame still contains only that subset (and join keys). Offline and hybrid online SQL paths use the same list in the SELECT clause.

  • store_type – Store to read from - StoreType.ONLINE or StoreType.OFFLINE (default).

  • use_session_warehouse – If True, use the session’s current warehouse instead of the feature store’s configured warehouse. No-op for Postgres-backed online reads.

  • as_pandas – Return type. None (default) returns pandas.DataFrame for Postgres-backed online reads and Snowpark DataFrame everywhere else. True forces pandas.DataFrame (rejected for store_type=StoreType.OFFLINE). False forces Snowpark DataFrame.

Returns:

Snowpark DataFrame (or pandas.DataFrame when as_pandas resolves to True) containing the FeatureView data.

Raises:
  • SnowflakeMLException – [ValueError] version argument is missing when argument feature_view is a str.

  • SnowflakeMLException – [ValueError] FeatureView is not registered.

  • SnowflakeMLException – [ValueError] Online store is not enabled for this feature view.

  • SnowflakeMLException – [ValueError] Invalid store type.

  • SnowflakeMLException – [ValueError] as_pandas=True with store_type=StoreType.OFFLINE.

Example:

>>> fs = FeatureStore(...)
>>> # Read all data from offline store
>>> fs.read_feature_view('foo', 'v1', store_type=StoreType.OFFLINE).show()
------------------------------------------
|"NAME"  |"ID"  |"TITLE"  |"AGE"  |"TS"  |
------------------------------------------
|jonh    |1     |boss     |20     |100   |
|porter  |2     |manager  |30     |200   |
------------------------------------------

>>> # Filter by keys in offline store
>>> fs.read_feature_view('foo', 'v1', keys=[["1"], ["2"]], store_type=StoreType.OFFLINE).show()
------------------------------------------
|"NAME"  |"ID"  |"TITLE"  |"AGE"  |"TS"  |
------------------------------------------
|jonh    |1     |boss     |20     |100   |
|porter  |2     |manager  |30     |200   |
------------------------------------------

>>> # Read from online store with specific keys (same API)
>>> fs.read_feature_view('foo', 'v1', keys=[["1"], ["2"]], store_type=StoreType.ONLINE).show()
--------------------------------
|"ID"  |"TITLE"  |"AGE"       |
--------------------------------
|1     |boss     |20          |
|2     |manager  |30          |
--------------------------------

>>> # Select specific features (works for both stores)
>>> fs.read_feature_view('foo', 'v1', keys=[["1"]], feature_names=["TITLE", "AGE"]).show()
----------------------
|"TITLE"  |"AGE"    |
----------------------
|boss     |20       |
----------------------
>>> # Postgres online reads default to pandas (local-build fast path).
>>> # Pass as_pandas=False to force a Snowpark DataFrame instead.
>>> pdf = fs.read_feature_view('foo', 'v1', keys=[["1"]], store_type=StoreType.ONLINE)
>>> type(pdf).__name__
'DataFrame'
refresh_feature_view(feature_view: str, version: str, *, store_type: Union[fv_mod.StoreType, str] = fv_mod.StoreType.OFFLINE) None
refresh_feature_view(feature_view: FeatureView, version: Optional[str] = None, *, store_type: Union[fv_mod.StoreType, str] = fv_mod.StoreType.OFFLINE) None

Manually refresh a feature view.

Parameters:
  • feature_view – A registered feature view object, or the name of feature view.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

  • store_type

    Specify which storage to refresh. Can be StoreType.OFFLINE or StoreType.ONLINE. - StoreType.OFFLINE (default): Refreshes the offline feature view. - StoreType.ONLINE: Refreshes the online feature table for real-time serving.

    Only available for feature views with online=True.

    Defaults to StoreType.OFFLINE.

Raises:

SnowflakeMLException – [ValueError] Invalid store type.

Example:

>>> fs = FeatureStore(...)
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')

>>> # refresh with name and version
>>> fs.refresh_feature_view('MY_FV', 'v1')
>>> fs.get_refresh_history('MY_FV', 'v1').show()
-----------------------------------------------------------------------------------------------------
|"NAME"    |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:53:58.504000  |2024-07-10 14:53:59.088000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------

>>> # refresh with feature view object
>>> fs.refresh_feature_view(fv)
>>> fs.get_refresh_history(fv).show()
-----------------------------------------------------------------------------------------------------
|"NAME"    |"STATE"    |"REFRESH_START_TIME"        |"REFRESH_END_TIME"          |"REFRESH_ACTION"  |
-----------------------------------------------------------------------------------------------------
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:54:06.680000  |2024-07-10 14:54:07.226000  |INCREMENTAL       |
|MY_FV$v1  |SUCCEEDED  |2024-07-10 14:53:58.504000  |2024-07-10 14:53:59.088000  |INCREMENTAL       |
-----------------------------------------------------------------------------------------------------
register_entity(entity: Entity) Entity

Register Entity in the FeatureStore.

Parameters:

entity – Entity object to be registered.

Returns:

A registered entity object.

Raises:

SnowflakeMLException – [RuntimeError] Failed to find resources.

Example:

>>> fs = FeatureStore(...)
>>> e = Entity('BAR', ['A'], desc='entity bar')
>>> fs.register_entity(e)
>>> fs.list_entities().show()
--------------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"      |"OWNER"     |
--------------------------------------------------
|BAR     |["A"]        |entity bar  |REGTEST_RL  |
--------------------------------------------------
register_feature_view(feature_view: FeatureView, version: str, *, block: bool = True, overwrite: bool = False) FeatureView

Materialize a FeatureView to Snowflake backend. Incremental maintenance for updates on the source data will be automated if refresh_freq is set. NOTE: Each new materialization will trigger a full FeatureView history refresh for the data included in the

FeatureView.

Parameters:
  • feature_view – FeatureView instance to materialize.

  • version – version of the registered FeatureView. NOTE: Version only accepts letters, numbers and underscore. Also version will be capitalized.

  • block – Deprecated. To make the initial refresh asynchronous, set the initialize argument on the FeatureView to “ON_SCHEDULE”. Default is true.

  • overwrite – Overwrite the existing FeatureView with same version. This is the same as dropping the FeatureView first then recreate. NOTE: there will be backfill cost associated if the FeatureView is being continuously maintained.

Returns:

A materialized FeatureView object.

Raises:
  • SnowflakeMLException – [ValueError] FeatureView entity has not been registered.

  • SnowflakeMLException – [ValueError] Warehouse or default warehouse is not specified.

  • SnowflakeMLException – [RuntimeError] Failed to create dynamic table, task, or view.

  • SnowflakeMLException – [RuntimeError] Failed to find resources.

  • Exception – Unexpected error during registration.

Example:

>>> fs = FeatureStore(...)
>>> # draft_fv is a local object that hasn't materialized to Snowflake backend yet.
>>> feature_df = session.sql("select f_1, f_2 from source_table")
>>> draft_fv = FeatureView("my_fv", [entities], feature_df)
>>> print(draft_fv.status)
FeatureViewStatus.DRAFT

>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|        |           |                    |
-------------------------------------------

>>> # registered_fv is a local object that maps to a Snowflake backend object.
>>> registered_fv = fs.register_feature_view(draft_fv, "v1")
>>> print(registered_fv.status)
FeatureViewStatus.ACTIVE

>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |ACTIVE              |
-------------------------------------------
register_stream_source(stream_source: StreamSource) StreamSource

Register a StreamSource in the FeatureStore.

Parameters:

stream_source – StreamSource object to be registered.

Returns:

A registered StreamSource object with owner populated.

Raises:

SnowflakeMLException – [RuntimeError] Failed to save stream source metadata.

Example:

>>> fs = FeatureStore(...)
>>> from snowflake.snowpark.types import (
...     StructType, StructField, StringType, FloatType, TimestampType,
... )
>>> txn_source = StreamSource(
...     name="transaction_events",
...     schema=StructType([
...         StructField("user_id", StringType()),
...         StructField("amount", FloatType()),
...         StructField("event_time", TimestampType()),
...     ]),
...     desc="Real-time transaction events",
... )
>>> fs.register_stream_source(txn_source)
>>> fs.list_stream_sources().show()
-------------------------------------------------------------------------------------...
|"NAME"                |"SCHEMA"  |"DESC"                    |"OWNER"     |
-------------------------------------------------------------------------------------...
|TRANSACTION_EVENTS    |[...]     |Real-time transaction...  |REGTEST_RL  |
-------------------------------------------------------------------------------------...

This function or method is in private preview since 1.33.0.

resume_feature_view(feature_view: FeatureView) FeatureView
resume_feature_view(feature_view: str, version: str) FeatureView

Resume a previously suspended FeatureView.

This operation resumes both the offline feature view (dynamic table and associated task) and the online feature table (if it exists) to ensure consistent state across all storage types.

Parameters:
  • feature_view – FeatureView object or name to resume.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Returns:

A new feature view with updated status.

Example:

>>> fs = FeatureStore(...)
>>> # you must already have feature views registered
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> fs.suspend_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |SUSPENDED           |
-------------------------------------------

>>> fs.resume_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |ACTIVE              |
-------------------------------------------
retrieve_feature_values(spine_df: DataFrame, features: Union[list[Union[FeatureView, snowflake.ml.feature_store.feature_view.FeatureViewSlice]], list[str]], *, spine_timestamp_col: Optional[str] = None, exclude_columns: Optional[list[str]] = None, include_feature_view_timestamp_col: bool = False, auto_prefix: bool = False, join_method: Literal['sequential', 'cte'] = 'sequential') DataFrame

Enrich spine dataframe with feature values for inference.

Uses the latest entity mappings for rollup feature views. For point-in-time correct entity mappings (e.g., during training or backtesting with temporal rollup FVs), use generate_training_set instead.

If spine_timestamp_col is specified, point-in-time feature values will be fetched.

Parameters:
  • spine_df – Snowpark DataFrame to join features into.

  • features – List of features to join into the spine_df. Can be a list of FeatureView or FeatureViewSlice, or a list of serialized feature objects from Dataset.

  • spine_timestamp_col – Timestamp column in spine_df for point-in-time feature value lookup.

  • exclude_columns – Column names to exclude from the result dataframe.

  • include_feature_view_timestamp_col – Generated dataset will include timestamp column of feature view (if feature view has timestamp column) if set true. Default to false.

  • auto_prefix – If True, automatically prefix all feature columns with ‘{feature_view_name}_{version}_’ to avoid name collisions. Default False. Use FeatureView.with_name() for custom prefixes.

  • join_method – Method for feature joins. “sequential” for layer-by-layer joins (default), “cte” for CTE method. (Internal use only - subject to change)

Returns:

Snowpark DataFrame containing the joined results.

Raises:

ValueError – if features is empty.

Example:

>>> fs = FeatureStore(...)
>>> # Assume you already have feature view registered.
>>> fv = fs.get_feature_view('my_fv', 'v1')
>>> # Spine dataframe has same join keys as the entity of fv.
>>> spine_df = session.create_dataframe(["1", "2"], schema=["id"])
>>> fs.retrieve_feature_values(spine_df, [fv]).show()
--------------------
|"END_STATION_ID"  |
--------------------
|505               |
|347               |
|466               |
--------------------
stream_ingest(stream_source: Union[str, StreamSource], records: Union[list[dict[str, Any]], dict[str, Any]], *, timeout_sec: float = 120.0, statement_params: Optional[dict[str, Any]] = None) int

Send rows to the Online Service for ingestion.

Requires the SNOWFLAKE_PAT environment variable to be set with a valid Snowflake Programmatic Access Token.

records may be one row as a dict or a non-empty list of row dicts. Each row’s keys must match the registered StreamSource schema exactly (no missing or extra columns).

Parameters:
  • stream_source – Registered stream source name or a StreamSource instance.

  • records – Rows to ingest.

  • timeout_sec – Timeout in seconds for the ingest request.

  • statement_params – Optional Snowpark statement parameters (for example telemetry).

Returns:

Count of records accepted by the Online Service. On partial success, the returned count may be less than the number of rows sent.

Raises:

SnowflakeMLException – If the Online Service or ingest endpoint is unavailable, SNOWFLAKE_PAT is unset, the request fails, or rows do not match the stream source schema.

suspend_feature_view(feature_view: FeatureView) FeatureView
suspend_feature_view(feature_view: str, version: str) FeatureView

Suspend an active FeatureView.

This operation suspends both the offline feature view (dynamic table and associated task) and the online feature table (if it exists).

Parameters:
  • feature_view – FeatureView object or name to suspend.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

Returns:

A new feature view with updated status.

Example:

>>> fs = FeatureStore(...)
>>> # assume you already have feature views registered
>>> fv = fs.get_feature_view(name='MY_FV', version='v1')
>>> fs.suspend_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |SUSPENDED           |
-------------------------------------------

>>> fs.resume_feature_view('MY_FV', 'v1')
>>> fs.list_feature_views().select("NAME", "VERSION", "SCHEDULING_STATE").show()
-------------------------------------------
|"NAME"  |"VERSION"  |"SCHEDULING_STATE"  |
-------------------------------------------
|MY_FV   |v1         |ACTIVE              |
-------------------------------------------
update_default_warehouse(warehouse_name: str) None

Update default warehouse for feature store.

Parameters:

warehouse_name – Name of warehouse.

Raises:

SnowflakeMLException – If warehouse does not exists.

Example:

>>> fs = FeatureStore(...)
>>> fs.update_default_warehouse("MYWH_2")
>>> draft_fv = FeatureView("my_fv", ...)
>>> registered_fv = fs.register_feature_view(draft_fv, '2.0')
>>> print(registered_fv.warehouse)
MYWH_2
update_entity(name: str, *, desc: Optional[str] = None) Optional[Entity]

Update a registered entity with provided information.

Parameters:
  • name – Name of entity to update.

  • desc – Optional new description to apply. Default to None.

Raises:

SnowflakeMLException – Error happen when updating.

Returns:

A new entity with updated information or None if the entity doesn’t exist.

Example:

>>> fs = FeatureStore(...)

>>> e = Entity(name='foo', join_keys=['COL_1'], desc='old desc')
>>> fs.list_entities().show()
------------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"    |"OWNER"     |
------------------------------------------------
|FOO     |["COL_1"]    |old desc  |REGTEST_RL  |
------------------------------------------------

>>> fs.update_entity('foo', desc='NEW DESC')
>>> fs.list_entities().show()
------------------------------------------------
|"NAME"  |"JOIN_KEYS"  |"DESC"    |"OWNER"     |
------------------------------------------------
|FOO     |["COL_1"]    |NEW DESC  |REGTEST_RL  |
------------------------------------------------
update_feature_view(name: str, version: str, *, refresh_freq: Optional[str] = None, warehouse: Optional[str] = None, desc: Optional[str] = None, online_config: Optional[fv_mod.OnlineConfig] = None) FeatureView
update_feature_view(name: FeatureView, version: Optional[str] = None, *, refresh_freq: Optional[str] = None, warehouse: Optional[str] = None, desc: Optional[str] = None, online_config: Optional[fv_mod.OnlineConfig] = None) FeatureView
Update a registered feature view.

Check feature_view.py for which fields are allowed to be updated after registration.

Parameters:
  • name – FeatureView object or name to suspend.

  • version – Optional version of feature view. Must set when argument feature_view is a str.

  • refresh_freq – updated refresh frequency.

  • warehouse – updated warehouse.

  • desc – description of feature view.

  • online_config – updated online configuration for the online feature table. If provided with enable=True, creates online feature table if absent. If provided with enable=False, drops online feature table if present. If None (default), no change to online status. During update, only explicitly set fields in the OnlineConfig will be updated.

Returns:

Updated FeatureView.

Example:

>>> fs = FeatureStore(...)
>>> fv = FeatureView(
...     name='foo',
...     entities=[e1, e2],
...     feature_df=session.sql('...'),
...     desc='this is old description',
... )
>>> fv = fs.register_feature_view(feature_view=fv, version='v1')
>>> fs.list_feature_views().select("name", "version", "desc").show()
------------------------------------------------
|"NAME"  |"VERSION"  |"DESC"                   |
------------------------------------------------
|FOO     |v1         |this is old description  |
------------------------------------------------

>>> # update_feature_view will apply new arguments to the registered feature view.
>>> new_fv = fs.update_feature_view(
...     name='foo',
...     version='v1',
...     desc='that is new descption',
... )
>>> fs.list_feature_views().select("name", "version", "desc").show()
------------------------------------------------
|"NAME"  |"VERSION"  |"DESC"                   |
------------------------------------------------
|FOO     |v1         |THAT IS NEW DESCRIPTION  |
------------------------------------------------

>>> # Enable online storage with custom configuration
>>> config = OnlineConfig(enable=True, target_lag='15s')
>>> online_fv = fs.update_feature_view(
...     name='foo',
...     version='v1',
...     online_config=config,
... )
>>> print(online_fv.online)
True
Raises:
  • SnowflakeMLException – [RuntimeError] If FeatureView is not managed and refresh_freq is defined.

  • SnowflakeMLException – [RuntimeError] Failed to update feature view.

update_stream_source(name: str, *, desc: Optional[str] = None) Optional[StreamSource]

Update a registered StreamSource description.

Parameters:
  • name – Name of the StreamSource to update.

  • desc – New description to apply. Default to None (no change).

Returns:

Updated StreamSource object, or None if the StreamSource doesn’t exist.

Raises:

SnowflakeMLException – [RuntimeError] Failed to update stream source.

Example:

>>> fs = FeatureStore(...)
>>> fs.update_stream_source("transaction_events", desc="Updated description")
>>> fs.get_stream_source("transaction_events").desc
'Updated description'