Utiliser le Snowflake Feature Store en production

Le Snowflake ML Feature Store vous aide à gérer vos fonctionnalités tout au long du processus d’ingénierie des fonctionnalités.

Pour les applications en ligne qui nécessitent une inférence à faible latence, utilisez le Feature Store en ligne pour mettre les fonctionnalités à disposition.

Les sections suivantes décrivent le processus de production pour récupérer des fonctionnalités dans votre application Python. Ces sections contiennent des exemples de code qui effectuent les opérations suivantes :

  1. Charger l’ensemble de données Iris dans Snowflake

  2. Définir la connexion avec Snowflake

  3. Créer le Feature Store et les vues des fonctionnalités

  4. Récupérer les fonctionnalités et les valeurs des fonctionnalités

  5. Générer des prédictions à partir de votre modèle

Les exemples de code sont écrits en Python. Pour suivre ce workflow pour les applications écrites dans d’autres langages, utilisez un pilote Snowflake spécifique à ce langage. Pour plus d’informations, voir Pilotes.

Conditions préalables

Pour exécuter la récupération de fonctionnalités ML en ligne dans Snowflake, vous avez besoin des éléments suivants :

  • Données que vous avez déjà chargées dans Snowflake

  • Un Snowflake Feature Store

  • Vues de fonctions

  • Service des fonctionnalités en ligne activé pour chaque vue de fonctionnalités

Vous pouvez utiliser des fonctionnalités de votre propre Snowflake Feature Store, mais vous pouvez utiliser le code suivant pour charger l’ensemble de données Iris dans Snowflake si vous ne disposez pas déjà d’un Feature Store.

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

from snowflake.snowpark.context import get_active_session

sf_session = get_active_session()

### Download the Iris dataset.

iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
    'sepal length (cm)': 'SEPAL_LENGTH_CM',
    'sepal width (cm)': 'SEPAL_WIDTH_CM',
    'petal length (cm)': 'PETAL_LENGTH_CM',
    'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target

### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
    X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
    table_name="SEPAL_DATA",
    auto_create_table=True,
    overwrite=True
)
petal_df = sf_session.write_pandas(
    X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
    table_name="PETAL_DATA",
    auto_create_table=True,
    overwrite=True
)
Copy

Une fois que les données sont disponibles dans votre environnement, vous pouvez créer un Feature Store. Le code suivant crée un Feature Store et l’entité id_entity pour les différents échantillons de l’ensemble de données Iris.

### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig

### Create Snowflake feature store

feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse=sf_session.get_current_warehouse(),
    creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")

id_entity = Entity(
    name='SAMPLE_ID',
    join_keys=["ID"],
    desc='sample id'
)
feature_store.register_entity(id_entity)
Copy

Note

Le Snowflake ML Feature Store dispose du concept d’entités. Les entités sont des clés qui organisent les fonctionnalités entre les vues de fonctionnalités. Pour plus d’informations sur les entités, voir Utilisation d’entités.

Après avoir créé le Feature Store, il vous faut définir les vues des fonctionnalités. Le code suivant définit les vues des fonctionnalités sépales et pétales à partir de l’ensemble de données Iris.

### Create feature views with Online Serving.
sepal_fv = FeatureView(
    name='SEPAL_FEATURES',
    entities=[id_entity],
    feature_df=sepal_df,
    desc='Sepal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
    name='PETAL_FEATURES',
    entities=[id_entity],
    feature_df=petal_df,
    desc='Petal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
    sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
    petal_fv, version="v1", overwrite=True)
Copy

Récupérer les valeurs des fonctionnalités

Après avoir enregistré les vues de fonctionnalités et activé la mise à disposition en ligne des fonctionnalités pour chaque vue, vous pouvez obtenir les valeurs de chaque vue dans votre application.

Pour récupérer les valeurs des fonctionnalités, procédez comme suit :

  1. Configurez une connexion à Snowflake

  2. Créez la session et les objets du Snowflake Feature Store qui s’initialisent au démarrage de l’application

  3. Récupérez les fonctionnalités de vos vues de fonctionnalités

  4. Créez un point de terminaison de prédiction et obtenez des prédictions à partir de ce point de terminaison

Important

Vous devez installer snowflake-ml-python>=1.18.0 dans l’environnement de votre application pour utiliser l’API Feature Store.

Pour vous connecter à Snowflake depuis votre application, vous devez configurer un jeton d’accès programmatique (PAT) ou une authentification par paire de clés comme méthode d’authentification.

Configurer le client

Lorsque vous initialisez votre application, elle doit se connecter à l’API Snowflake ML Feature Store et créer les objets Python du Feature Store requis.

Consultez les sections suivantes pour savoir comment configurer la connexion de votre client à l’API Snowflake ML Feature Store.

Configurer un jeton d’accès programmatique (PAT)

Spécifiez les paramètres de connexion suivants dans le code suivant pour vous connecter à Snowflake depuis votre application :

import os

### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
    "account": "<account_identifier>",
    "user": "<user>",
    "password": pat,
    "role": "<FS_CONSUMER_ROLE>",
    "host": "<host>",
    "warehouse": "<warehouse>",
    "database": "<database>",
    "schema": "MY_FEATURE_STORE",
}
Copy

Créer la session et les objets du Feature Store

Après avoir défini vos paramètres de connexion, il vous faut créer la session et les objets du Feature Store que votre application utilise pour se connecter à Snowflake.

Le code suivant :

  1. Crée la session Snowflake, le client que votre application utilise pour communiquer avec Snowflake.

  2. Configure un exécuteur de pool de threads pour activer le parallélisme de récupération des fonctionnalités.

  3. Répertorie les fonctionnalités que nous récupérons dans le Feature Store.

  4. Initialise le client de lecteur du Feature Store. Cet objet englobe la session Snowflake. C’est le principal moyen par lequel votre application interagit avec le Feature Store.

  5. Initialise les vues de fonctionnalités que vous avez définies. Vous pouvez les remplacer par vos propres fonctionnalités.

import os
from concurrent.futures import ThreadPoolExecutor

from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()

# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

# 3. List individual features we are going to retrieve for inference. In this
#    example, we are listing Iris features described above in the
#    "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]

# 4. Initialize feature store consumer client
feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse="<warehouse>",
    creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)

# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Copy

Récupérer les fonctionnalités en ligne sur le chemin de service

Après avoir défini la manière dont l’application s’initialise, vous pouvez créer un point de terminaison de prédiction.

Il existe différentes façons de définir la manière dont votre application traite les requêtes. Le code Python suivant :

  • Définit le point de terminaison de la prédiction dans votre application

  • Prend les clés de la requête JSON

  • Utilise les clés pour récupérer les valeurs des fonctionnalités à partir des vues de fonctionnalités

  • Transfère ces valeurs de fonctionnalités au modèle

  • Obtient les prédictions du modèle

  • Renvoie les prédictions dans la réponse

from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask

def _retrieve_features(
    feature_view: FeatureView,
    keys: List[int],
    feature_names: List[str]):
    """Retrieve features from the given feature view"""

    return feature_store.read_feature_view(
        feature_view,
        keys=[keys],
        feature_names=feature_names,
        store_type=StoreType.ONLINE  # Query the ONLINE store
    ).collect()

@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
    if flask.request.content_type == 'application/json':
        input_data = flask.request.data.decode("utf-8")
        input_data = json.loads(data)
    else:
        return flask.Response(
            response="This predictor only supports JSON data",
            status=415,
            mimetype="text/plain"
        )

    # Expect that input data is a single key
    keys = [int(input_data["key"])]

    # Retrieve features from two feature views in parallel.
    sepal_features = executor.submit(
        _retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
    petal_features = executor.submit(
        _retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)

    sepal_features = sepal_features.result()
    petal_features = petal_features.result()

    predictions = []
    if len(sepal_features) != 0 and len(petal_features) != 0:
        # Compose the feature vector, excluding the join keys.
        feature_vector = (
            list(sepal_features[0][1:])
            + list(petal_features[0][1:])
        )

        # Using a hypothetical run_inference function.
        predictions = run_inference(feature_vector)

    result = json.dumps({"results": list(predictions)})
    return flask.Response(response=result, status=200,
        mimetype="application/json")
Copy

Le code précédent appelle une fonction run_inference hypothétique. Votre propre fonction d’inférence peut obtenir des prédictions à partir de votre modèle, qu’il soit hébergé à distance ou dans la mémoire de l’application.

Le point de terminaison de prédiction dans le code précédent accepte une clé et renvoie la prédiction pour cette clé. Vos données peuvent comporter plusieurs clés caractérisant un seul échantillon. Le code ci-dessus est un exemple que vous pouvez adapter à votre propre cas d’utilisation.