実稼働環境でSnowflakeのオンライン特徴量ストアを使用する

Snowflake ML 特徴量ストアは、特徴量エンジニアリングのプロセス全体で特徴量を管理するのに役立ちます。

低レイテンシの推論を必要とするオンラインアプリケーションの場合は、オンライン特徴量ストアを使用して機能を提供します。

以下のセクションでは、Pythonアプリケーション内の特徴量を取得するプロセスを実稼働環境向けに実装する方法について説明します。これらのセクションには、次の操作を実行するコード例があります。

  1. IrisデータセットをSnowflakeにロードする

  2. Snowflakeへの接続を定義する

  3. 特徴量ストアと特徴量ビューを作成する

  4. 特徴量および特徴量値を取得する

  5. モデルから予測を生成する

コード例はPythonで記述されています。他の言語で記述されたアプリケーションに対してこのワークフローを実行するには、その言語に固有のSnowflakeドライバーを使用します。詳細については、 ドライバー をご参照ください。

前提条件

Snowflakeでオンライン ML 特徴量の取得を実行するには、以下が必要です。

  • すでにSnowflakeにロードされているデータ

  • Snowflake特徴量ストア

  • 特徴ビュー

  • 各特徴量ビューでオンライン特徴量の使用が有効になっている

自分のSnowflake特徴量ストアの特徴量を使うこともできますが、まだ特徴量ストアがない場合は、以下のコードを使ってIrisデータセットをSnowflakeにロードすることができます。

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

from snowflake.snowpark.context import get_active_session

sf_session = get_active_session()

### Download the Iris dataset.

iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
    'sepal length (cm)': 'SEPAL_LENGTH_CM',
    'sepal width (cm)': 'SEPAL_WIDTH_CM',
    'petal length (cm)': 'PETAL_LENGTH_CM',
    'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target

### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
    X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
    table_name="SEPAL_DATA",
    auto_create_table=True,
    overwrite=True
)
petal_df = sf_session.write_pandas(
    X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
    table_name="PETAL_DATA",
    auto_create_table=True,
    overwrite=True
)
Copy

データを環境に用意した後、特徴量ストアを作成します。次のコードは、特徴量ストアと、Irisデータセットからのさまざまなサンプルの id_entity エンティティを作成します。

### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig

### Create Snowflake feature store

feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse=sf_session.get_current_warehouse(),
    creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")

id_entity = Entity(
    name='SAMPLE_ID',
    join_keys=["ID"],
    desc='sample id'
)
feature_store.register_entity(id_entity)
Copy

注釈

Snowflake ML 特徴量ストアにはエンティティの概念があります。エンティティは、特徴量ビュー間の特徴量を整理するキーです。エンティティの詳細については、 エンティティの操作 をご参照ください。

特徴量ストアを作成した後、特徴量ビューを定義します。次のコードは、Irisデータセットから花弁とがく片の特徴量ビューを定義します。

### Create feature views with Online Serving.
sepal_fv = FeatureView(
    name='SEPAL_FEATURES',
    entities=[id_entity],
    feature_df=sepal_df,
    desc='Sepal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
    name='PETAL_FEATURES',
    entities=[id_entity],
    feature_df=petal_df,
    desc='Petal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
    sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
    petal_fv, version="v1", overwrite=True)
Copy

特徴量値を取得する

特徴量ビューを登録し、各特徴量ビューのオンライン特徴量の使用を有効にすると、各特徴量ビューの特徴量値をアプリケーションに提供することができます。

特徴量値を取得するには、以下を実行します。

  1. Snowflakeへの接続を設定する

  2. アプリケーションの起動時に初期化するセッションとSnowflake特徴量ストアオブジェクトを作成する

  3. 特徴量ビューから特徴量を取得する

  4. 予測エンドポイントを作成し、そのエンドポイントから予測を取得する

重要

特徴量ストア API を使用するには、アプリケーションの環境に snowflake-ml-python>=1.18.0 をインストールする必要があります。

アプリケーションからSnowflakeに接続するには、認証方法として プログラムアクセストークン( PAT ) または キーペア認証 のいずれかを設定する必要があります。

クライアントを構成する

アプリケーションを初期化するときは、Snowflake ML 特徴量ストア API に接続して、必要な特徴量ストアPythonオブジェクトを作成する必要があります。

次のセクションを使用して、Snowflake ML 特徴量ストア API へのクライアントの接続を構成します。

プログラムアクセストークン( PAT )を構成する

アプリケーションからSnowflakeに接続するには、次のコードで次の接続パラメーターを指定します。

import os

### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
    "account": "<account_identifier>",
    "user": "<user>",
    "password": pat,
    "role": "<FS_CONSUMER_ROLE>",
    "host": "<host>",
    "warehouse": "<warehouse>",
    "database": "<database>",
    "schema": "MY_FEATURE_STORE",
}
Copy

セッションおよび特徴量ストアオブジェクトを作成する

接続パラメーターを定義したら、アプリケーションがSnowflakeに接続するために使用するセッションと特徴量ストアオブジェクトを作成します。

次のコードは、次を実行します。

  1. アプリケーションがSnowflakeと通信するために使用するクライアントであるSnowflakeセッションを作成します。

  2. 特徴量取得の並列処理を有効にするために、スレッドプールエグゼキューターを構成します。

  3. 特徴量ストアから取得する特徴量をリストします。

  4. 特徴量ストアリーダークライアントを初期化します。このオブジェクトは、Snowflakeセッションをラップします。これは、アプリケーションが特徴量ストアとやり取りする主な方法です。

  5. 定義した特徴量ビューを初期化します。これらを独自の特徴量に置き換えることができます。

import os
from concurrent.futures import ThreadPoolExecutor

from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()

# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

# 3. List individual features we are going to retrieve for inference. In this
#    example, we are listing Iris features described above in the
#    "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]

# 4. Initialize feature store consumer client
feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse="<warehouse>",
    creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)

# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Copy

使用中のパスのオンライン特徴量を取得する

アプリケーションの初期化方法を定義したら、予測エンドポイントを作成できます。

アプリケーションがリクエストを処理する方法を定義するさまざまな方法があります。次のPythonコードは、次を実行します。

  • アプリケーションの予測エンドポイントを定義する

  • JSON リクエストからキーを取得する

  • キーを使用して、特徴量ビューから特徴量値を取得する

  • これらの特徴量値をモデルに渡す

  • モデルから予測を取得する

  • 応答で予測を返す

from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask

def _retrieve_features(
    feature_view: FeatureView,
    keys: List[int],
    feature_names: List[str]):
    """Retrieve features from the given feature view"""

    return feature_store.read_feature_view(
        feature_view,
        keys=[keys],
        feature_names=feature_names,
        store_type=StoreType.ONLINE  # Query the ONLINE store
    ).collect()

@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
    if flask.request.content_type == 'application/json':
        input_data = flask.request.data.decode("utf-8")
        input_data = json.loads(data)
    else:
        return flask.Response(
            response="This predictor only supports JSON data",
            status=415,
            mimetype="text/plain"
        )

    # Expect that input data is a single key
    keys = [int(input_data["key"])]

    # Retrieve features from two feature views in parallel.
    sepal_features = executor.submit(
        _retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
    petal_features = executor.submit(
        _retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)

    sepal_features = sepal_features.result()
    petal_features = petal_features.result()

    predictions = []
    if len(sepal_features) != 0 and len(petal_features) != 0:
        # Compose the feature vector, excluding the join keys.
        feature_vector = (
            list(sepal_features[0][1:])
            + list(petal_features[0][1:])
        )

        # Using a hypothetical run_inference function.
        predictions = run_inference(feature_vector)

    result = json.dumps({"results": list(predictions)})
    return flask.Response(response=result, status=200,
        mimetype="application/json")
Copy

上記のコードは仮想の run_inference 関数を呼び出しています。独自の推論関数は、リモートでホストされているかアプリケーションメモリ内でホストされているかに関係なく、モデルから予測を取得できます。

上記のコードの予測エンドポイントはキーを受け取り、そのキーの予測を返します。データには、単一のサンプルを特徴付ける複数のキーがある場合があります。上記のコードは、独自のユースケースに適応できる例です。