Snowflake ML Feature Store helps manage your features throughout the process of feature engineering.
For online applications that require low-latency inference, use the online feature store to serve your features.
The following sections go through productionizing the process of retrieving features within your Python application. These sections have
code examples that do the following:
Load the Iris dataset into Snowflake
Define the connection to Snowflake
Create the Feature Store and Feature Views
Retrieve the features and feature values
Generate predictions from your model
The code examples are written in Python. To go through this workflow for applications written in other languages, use a Snowflake driver
that’s specific to that language. For more information, see Drivers.
To run online ML feature retrieval in Snowflake, you need the following:
Data that you’ve already loaded into Snowflake
A Snowflake feature store
Feature views
Online feature serving enabled for each feature view
You can use features from your own Snowflake feature store, but you can use the following code to load the Iris dataset into Snowflake if
you don’t already have a feature store.
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from snowflake.snowpark.context import get_active_session
sf_session = get_active_session()
### Download the Iris dataset.
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
'sepal length (cm)': 'SEPAL_LENGTH_CM',
'sepal width (cm)': 'SEPAL_WIDTH_CM',
'petal length (cm)': 'PETAL_LENGTH_CM',
'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target
### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
table_name="SEPAL_DATA",
auto_create_table=True,
overwrite=True
)
petal_df = sf_session.write_pandas(
X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
table_name="PETAL_DATA",
auto_create_table=True,
overwrite=True
)
After you have the data in your environment, you create the feature store. The following code creates a feature store and the
id_entity entity for the different samples from the Iris dataset.
Snowflake ML Feature Store has the concept of entities. Entities are keys that organize features between feature views. For more information
about entities, see Working with entities.
After you’ve created the feature store, you define the feature views. The following code defines the sepal and petal feature views from the
Iris dataset.
After you’ve registered the feature views and enabled online feature serving for each feature view, you can have the feature values from
each feature view served to your application.
To retrieve the feature values, you do the following:
Set up a connection to Snowflake
Create the session and Snowflake Feature Store objects that initialize when the application starts
Retrieve the features from your feature views
Create a prediction endpoint and get predictions from that endpoint
Important
You must install snowflake-ml-python>=1.18.0 into your application’s environment to use the Feature Store API.
After you’ve defined your connection parameters, you create the session and Feature Store objects that your application uses to connect to
Snowflake.
The following code:
Creates the Snowflake Session, the client that your application uses to communicate with Snowflake.
Configures a thread pool executor to enable feature retrieval parallelism.
Lists the features that we’re retrieving from the feature store.
Initializes the feature store reader client. This object wraps the Snowflake session. It’s the main way your application interacts with the
feature store.
Initializes the feature views that you’ve defined. You can replace these with your own features.
import os
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode
# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()
# 2. Create a thread pool executor for feature store requestsMAX_WORKERS=os.cpu_count() *2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 3. List individual features we are going to retrieve for inference. In this# example, we are listing Iris features described above in the# "Prerequisites" section.PETAL_FEATURE_LIST= ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST= ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]
# 4. Initialize feature store consumer client
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse="<warehouse>",
creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)
# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
After you’ve defined how the application initializes, you can create a prediction endpoint.
There are different ways where you can define how your application handles requests. The following Python code:
Defines the prediction endpoint in your application
Takes the keys from the JSON request
Uses the keys to retrieve the feature values from the feature views
Passes those feature values to the model
Gets the predictions from the model
Returns the predictions in the response
from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask
def_retrieve_features(
feature_view: FeatureView,
keys: List[int],
feature_names: List[str]):
"""Retrieve features from the given feature view"""return feature_store.read_feature_view(
feature_view,
keys=[keys],
feature_names=feature_names,
store_type=StoreType.ONLINE# Query the ONLINE store
).collect()
@app.route("/prediction-endpoint", methods=["POST"])
defprediction():
if flask.request.content_type =='application/json':
input_data = flask.request.data.decode("utf-8")
input_data = json.loads(data)
else:
return flask.Response(
response="This predictor only supports JSON data",
status=415,
mimetype="text/plain"
)
# Expect that input data is a single key
keys = [int(input_data["key"])]
# Retrieve features from two feature views in parallel.
sepal_features = executor.submit(
_retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
petal_features = executor.submit(
_retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)
sepal_features = sepal_features.result()
petal_features = petal_features.result()
predictions = []
iflen(sepal_features) !=0andlen(petal_features) !=0:
# Compose the feature vector, excluding the join keys.
feature_vector = (
list(sepal_features[0][1:])
+list(petal_features[0][1:])
)
# Using a hypothetical run_inference function.
predictions = run_inference(feature_vector)
result = json.dumps({"results": list(predictions)})
return flask.Response(response=result, status=200,
mimetype="application/json")
The preceding code calls a hypothetical run_inference function. Your own inference function could get predictions from your model
regardless of whether it’s hosted remotely or in application memory.
The prediction endpoint in the preceding code accepts a key and returns the prediction for that key. Your data might have multiple keys
characterizing a single sample. The preceding code is meant to be an example that you can adapt to your own use case.