Verwenden des Snowflake Online Feature Store in der Produktion

Der Snowflake ML Feature Store hilft Ihnen, Ihre Features während des gesamten Feature-Engineering-Prozesses zu verwalten.

Bei Online-Anwendungen, die eine niedrige Latenz erfordern, verwenden Sie den Online Feature Store, um Ihre Features bereitzustellen.

In den folgenden Abschnitten erfahren Sie mehr über die Produktionsumgebung für das Abrufen von Features innerhalb Ihrer Python-Anwendung. Diese Abschnitte enthalten Codebeispiele für folgende Szenarien:

  1. Laden des Iris-Datensets in Snowflake

  2. Definieren der Verbindung zu Snowflake

  3. Erstellen des Feature Stores und der Feature-Ansichten

  4. Abrufen der Features und Feature-Werte

  5. Generieren von Vorhersagen aus Ihrem Modell

Die Codebeispiele sind in Python geschrieben. Um diesen Workflow für Anwendungen zu durchlaufen, die in anderen Sprachen geschrieben wurden, verwenden Sie einen Snowflake-Treiber, der für diese Sprache speziell entwickelt wurde. Weitere Informationen dazu finden Sie unter Treiber.

Voraussetzungen

Um ML-Features online in Snowflake abzurufen, benötigen Sie Folgendes:

  • Daten, die Sie bereits in Snowflake geladen haben

  • Einen Snowflake Feature Store

  • Feature-Ansichten

  • Für jede Feature-Ansicht aktivierte Online-Feature-Bereitstellung

Sie können Features aus Ihrem eigenen Snowflake Feature Store verwenden, aber Sie können den folgenden Code verwenden, um das Iris-Datenset in Snowflake zu laden, wenn Sie noch keinen Feature Store haben.

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

from snowflake.snowpark.context import get_active_session

sf_session = get_active_session()

### Download the Iris dataset.

iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
    'sepal length (cm)': 'SEPAL_LENGTH_CM',
    'sepal width (cm)': 'SEPAL_WIDTH_CM',
    'petal length (cm)': 'PETAL_LENGTH_CM',
    'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target

### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
    X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
    table_name="SEPAL_DATA",
    auto_create_table=True,
    overwrite=True
)
petal_df = sf_session.write_pandas(
    X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
    table_name="PETAL_DATA",
    auto_create_table=True,
    overwrite=True
)
Copy

Sobald die Daten in Ihrer Umgebung vorliegen, erstellen Sie den Feature Store. Der folgende Code erstellt einen Feature Store und die id_entity-Entität für die verschiedenen Proben aus dem Iris-Datenset.

### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig

### Create Snowflake feature store

feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse=sf_session.get_current_warehouse(),
    creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")

id_entity = Entity(
    name='SAMPLE_ID',
    join_keys=["ID"],
    desc='sample id'
)
feature_store.register_entity(id_entity)
Copy

Bemerkung

Der Snowflake ML Feature Store nutzt das Konzept von Entitäten. Entitäten sind Schlüssel, die Features zwischen Feature-Ansichten organisieren. Weitere Informationen zu Entitäten finden Sie unter Verwenden von Entitäten.

Nachdem Sie den Feature Store erstellt haben, definieren Sie die Feature-Ansichten. Der nachstehende Code legt die Sepal- und Petal-Feature-Ansichten des Iris-Datensets fest.

### Create feature views with Online Serving.
sepal_fv = FeatureView(
    name='SEPAL_FEATURES',
    entities=[id_entity],
    feature_df=sepal_df,
    desc='Sepal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
    name='PETAL_FEATURES',
    entities=[id_entity],
    feature_df=petal_df,
    desc='Petal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
    sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
    petal_fv, version="v1", overwrite=True)
Copy

Abrufen der Feature-Werte

Nachdem Sie die Feature-Ansichten registriert und die Online-Bereitstellung von Features für jede Feature-Ansicht aktiviert haben, können Sie die Feature-Werte von jeder Feature-Ansicht für Ihre Anwendung bereitstellen lassen.

Um die Feature-Werte abzurufen, gehen Sie wie folgt vor:

  1. Herstellen einer Verbindung zu Snowflake

  2. Erstellen der Sitzung und der Snowflake Feature Store-Objekte, die beim Starten der Anwendung initialisiert werden

  3. Abrufen der Features aus Ihren Feature-Ansichten

  4. Erstellen eines Vorhersageendpunkts und Abrufen von Vorhersagen von diesem Endpunkt

Wichtig

Sie müssen snowflake-ml-python>=1.18.0 in die Umgebung Ihrer Anwendung installieren, um die Feature Store API zu verwenden.

Um von Ihrer Anwendung aus eine Verbindung zu Snowflake herzustellen, müssen Sie entweder ein programmgesteuertes Zugriffstoken (PAT) oder die Schlüsselpaar-Authentifizierung als Authentifizierungsmethode einrichten.

Konfigurieren des Clients

Wenn Sie Ihre Anwendung initialisieren, muss diese eine Verbindung zur Snowflake ML Feature Store API und die erforderlichen Feature Store-Python-Objekte erstellen.

Verwenden Sie die folgenden Abschnitte, um die Verbindung Ihres Clients zur Snowflake ML Feature Store API zu konfigurieren.

Konfigurieren eines programmgesteuerten Zugriffstoken (PAT)

Geben Sie die folgenden Verbindungsparameter im folgenden Code an, um von Ihrer Anwendung aus eine Verbindung zu Snowflake herzustellen:

import os

### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
    "account": "<account_identifier>",
    "user": "<user>",
    "password": pat,
    "role": "<FS_CONSUMER_ROLE>",
    "host": "<host>",
    "warehouse": "<warehouse>",
    "database": "<database>",
    "schema": "MY_FEATURE_STORE",
}
Copy

Erstellen der Sitzungs- und Feature Store-Objekte

Nachdem Sie Ihre Verbindungsparameter definiert haben, erstellen Sie die Sitzungs- und Feature Store-Objekte, die Ihre Anwendung für die Verbindung mit Snowflake verwendet.

Der folgende Code:

  1. Erstellt die Snowflake-Sitzung, d. h. den Client, über den Ihre Anwendung mit Snowflake kommuniziert.

  2. Konfiguriert einen Thread-Pool-Executor, um die Parallelität beim Feature-Abruf zu ermöglichen.

  3. Listet die Features auf, die aus dem Feature Store abgerufen werden.

  4. Initialisiert den Feature Store-Reader-Client. Dieses Objekt umschließt die Snowflake-Sitzung. Es ist die Hauptmethode, mit der Ihre Anwendung mit dem Feature Store interagiert.

  5. Initialisiert die Feature-Ansichten, die Sie definiert haben. Sie können diese durch Ihre eigenen Features ersetzen.

import os
from concurrent.futures import ThreadPoolExecutor

from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()

# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

# 3. List individual features we are going to retrieve for inference. In this
#    example, we are listing Iris features described above in the
#    "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]

# 4. Initialize feature store consumer client
feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse="<warehouse>",
    creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)

# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Copy

Abrufen der Online-Features im Bereitstellungspfad

Nachdem Sie definiert haben, wie die Anwendung initialisiert wird, können Sie einen Vorhersageendpunkt erstellen.

Es gibt verschiedene Möglichkeiten, wie Sie definieren können, wie Ihre Anwendung mit Anforderungen umgeht. Der folgende Python-Code:

  • Definiert den Vorhersageendpunkt in Ihrer Anwendung

  • Übernimmt die Schlüssel aus der JSON-Anforderung

  • Verwendet die Schlüssel, um die Feature-Werte aus den Feature-Ansichten abzurufen

  • Übergibt diese Feature-Werte an das Modell

  • Ruft die Vorhersagen aus dem Modell ab

  • Gibt die Vorhersagen in der Antwort zurück

from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask

def _retrieve_features(
    feature_view: FeatureView,
    keys: List[int],
    feature_names: List[str]):
    """Retrieve features from the given feature view"""

    return feature_store.read_feature_view(
        feature_view,
        keys=[keys],
        feature_names=feature_names,
        store_type=StoreType.ONLINE  # Query the ONLINE store
    ).collect()

@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
    if flask.request.content_type == 'application/json':
        input_data = flask.request.data.decode("utf-8")
        input_data = json.loads(data)
    else:
        return flask.Response(
            response="This predictor only supports JSON data",
            status=415,
            mimetype="text/plain"
        )

    # Expect that input data is a single key
    keys = [int(input_data["key"])]

    # Retrieve features from two feature views in parallel.
    sepal_features = executor.submit(
        _retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
    petal_features = executor.submit(
        _retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)

    sepal_features = sepal_features.result()
    petal_features = petal_features.result()

    predictions = []
    if len(sepal_features) != 0 and len(petal_features) != 0:
        # Compose the feature vector, excluding the join keys.
        feature_vector = (
            list(sepal_features[0][1:])
            + list(petal_features[0][1:])
        )

        # Using a hypothetical run_inference function.
        predictions = run_inference(feature_vector)

    result = json.dumps({"results": list(predictions)})
    return flask.Response(response=result, status=200,
        mimetype="application/json")
Copy

Der vorangehende Code ruft eine hypothetische run_inference-Funktion auf. Ihre eigene Inferenzfunktion kann Vorhersagen von Ihrem Modell abrufen, unabhängig davon, ob dieses extern oder im Anwendungsspeicher gehostet wird.

Der Vorhersageendpunkt im vorhergehenden Code akzeptiert einen Schlüssel und gibt die Vorhersage für diesen Schlüssel zurück. Ihre Daten können mehrere Schlüssel haben, die eine einzelne Probe kennzeichnen. Der vorangehende Code dient als Beispiel, das Sie an Ihren eigenen Anwendungsfall anpassen können.