実稼働環境でSnowflakeのオンライン特徴量ストアを使用する¶
Snowflake ML 特徴量ストアは、特徴量エンジニアリングのプロセス全体で特徴量を管理するのに役立ちます。
低レイテンシの推論を必要とするオンラインアプリケーションの場合は、オンライン特徴量ストアを使用して機能を提供します。
以下のセクションでは、Pythonアプリケーション内の特徴量を取得するプロセスを実稼働環境向けに実装する方法について説明します。これらのセクションには、次の操作を実行するコード例があります。
IrisデータセットをSnowflakeにロードする
Snowflakeへの接続を定義する
特徴量ストアと特徴量ビューを作成する
特徴量および特徴量値を取得する
モデルから予測を生成する
コード例はPythonで記述されています。他の言語で記述されたアプリケーションに対してこのワークフローを実行するには、その言語に固有のSnowflakeドライバーを使用します。詳細については、 ドライバー をご参照ください。
前提条件¶
Snowflakeでオンライン ML 特徴量の取得を実行するには、以下が必要です。
すでにSnowflakeにロードされているデータ
Snowflake特徴量ストア
特徴ビュー
各特徴量ビューでオンライン特徴量の使用が有効になっている
自分のSnowflake特徴量ストアの特徴量を使うこともできますが、まだ特徴量ストアがない場合は、以下のコードを使ってIrisデータセットをSnowflakeにロードすることができます。
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from snowflake.snowpark.context import get_active_session
sf_session = get_active_session()
### Download the Iris dataset.
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
'sepal length (cm)': 'SEPAL_LENGTH_CM',
'sepal width (cm)': 'SEPAL_WIDTH_CM',
'petal length (cm)': 'PETAL_LENGTH_CM',
'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target
### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
table_name="SEPAL_DATA",
auto_create_table=True,
overwrite=True
)
petal_df = sf_session.write_pandas(
X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
table_name="PETAL_DATA",
auto_create_table=True,
overwrite=True
)
データを環境に用意した後、特徴量ストアを作成します。次のコードは、特徴量ストアと、Irisデータセットからのさまざまなサンプルの id_entity エンティティを作成します。
### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0
from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
Entity,
CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig
### Create Snowflake feature store
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse=sf_session.get_current_warehouse(),
creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")
id_entity = Entity(
name='SAMPLE_ID',
join_keys=["ID"],
desc='sample id'
)
feature_store.register_entity(id_entity)
注釈
Snowflake ML 特徴量ストアにはエンティティの概念があります。エンティティは、特徴量ビュー間の特徴量を整理するキーです。エンティティの詳細については、 エンティティの操作 をご参照ください。
特徴量ストアを作成した後、特徴量ビューを定義します。次のコードは、Irisデータセットから花弁とがく片の特徴量ビューを定義します。
### Create feature views with Online Serving.
sepal_fv = FeatureView(
name='SEPAL_FEATURES',
entities=[id_entity],
feature_df=sepal_df,
desc='Sepal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
name='PETAL_FEATURES',
entities=[id_entity],
feature_df=petal_df,
desc='Petal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
petal_fv, version="v1", overwrite=True)
特徴量値を取得する¶
特徴量ビューを登録し、各特徴量ビューのオンライン特徴量の使用を有効にすると、各特徴量ビューの特徴量値をアプリケーションに提供することができます。
特徴量値を取得するには、以下を実行します。
Snowflakeへの接続を設定する
アプリケーションの起動時に初期化するセッションとSnowflake特徴量ストアオブジェクトを作成する
特徴量ビューから特徴量を取得する
予測エンドポイントを作成し、そのエンドポイントから予測を取得する
重要
特徴量ストア API を使用するには、アプリケーションの環境に snowflake-ml-python>=1.18.0 をインストールする必要があります。
アプリケーションからSnowflakeに接続するには、認証方法として プログラムアクセストークン( PAT ) または キーペア認証 のいずれかを設定する必要があります。
クライアントを構成する¶
アプリケーションを初期化するときは、Snowflake ML 特徴量ストア API に接続して、必要な特徴量ストアPythonオブジェクトを作成する必要があります。
次のセクションを使用して、Snowflake ML 特徴量ストア API へのクライアントの接続を構成します。
プログラムアクセストークン( PAT )を構成する¶
アプリケーションからSnowflakeに接続するには、次のコードで次の接続パラメーターを指定します。
schema- Snowflake特徴量ストアの名前database- スキーマまたは特徴量ストアを含むデータベースrole- 特徴量ストアからの読み込みに必要なロール。詳細については、 オンライン特徴量の作成とサービングへのアクセスを提供する をご参照ください。password- 自分の PAT 。
import os
### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"password": pat,
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
アプリケーションからSnowflakeに接続するには、次のコードで次の接続パラメーターを指定します。
schema- Snowflake特徴量ストアの名前database- スキーマまたは特徴量ストアを含むデータベースrole- 特徴量ストアからの読み込みに必要なロール。詳細については、 オンライン特徴量の作成とサービング をご参照ください。private_key_file- 秘密キーファイルprivate_key_file_pwd- 秘密キーファイルへのパスワード
import os
### Define connection parameters for key-pair authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"private_key_file": "<private key file>",
"private_key_file_pwd": "<private key file pwd>",
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
セッションおよび特徴量ストアオブジェクトを作成する
接続パラメーターを定義したら、アプリケーションがSnowflakeに接続するために使用するセッションと特徴量ストアオブジェクトを作成します。
次のコードは、次を実行します。
アプリケーションがSnowflakeと通信するために使用するクライアントであるSnowflakeセッションを作成します。
特徴量取得の並列処理を有効にするために、スレッドプールエグゼキューターを構成します。
特徴量ストアから取得する特徴量をリストします。
特徴量ストアリーダークライアントを初期化します。このオブジェクトは、Snowflakeセッションをラップします。これは、アプリケーションが特徴量ストアとやり取りする主な方法です。
定義した特徴量ビューを初期化します。これらを独自の特徴量に置き換えることができます。
import os
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode
# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()
# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 3. List individual features we are going to retrieve for inference. In this
# example, we are listing Iris features described above in the
# "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]
# 4. Initialize feature store consumer client
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse="<warehouse>",
creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)
# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
使用中のパスのオンライン特徴量を取得する¶
アプリケーションの初期化方法を定義したら、予測エンドポイントを作成できます。
アプリケーションがリクエストを処理する方法を定義するさまざまな方法があります。次のPythonコードは、次を実行します。
アプリケーションの予測エンドポイントを定義する
JSON リクエストからキーを取得する
キーを使用して、特徴量ビューから特徴量値を取得する
これらの特徴量値をモデルに渡す
モデルから予測を取得する
応答で予測を返す
from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask
def _retrieve_features(
feature_view: FeatureView,
keys: List[int],
feature_names: List[str]):
"""Retrieve features from the given feature view"""
return feature_store.read_feature_view(
feature_view,
keys=[keys],
feature_names=feature_names,
store_type=StoreType.ONLINE # Query the ONLINE store
).collect()
@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
if flask.request.content_type == 'application/json':
input_data = flask.request.data.decode("utf-8")
input_data = json.loads(data)
else:
return flask.Response(
response="This predictor only supports JSON data",
status=415,
mimetype="text/plain"
)
# Expect that input data is a single key
keys = [int(input_data["key"])]
# Retrieve features from two feature views in parallel.
sepal_features = executor.submit(
_retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
petal_features = executor.submit(
_retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)
sepal_features = sepal_features.result()
petal_features = petal_features.result()
predictions = []
if len(sepal_features) != 0 and len(petal_features) != 0:
# Compose the feature vector, excluding the join keys.
feature_vector = (
list(sepal_features[0][1:])
+ list(petal_features[0][1:])
)
# Using a hypothetical run_inference function.
predictions = run_inference(feature_vector)
result = json.dumps({"results": list(predictions)})
return flask.Response(response=result, status=200,
mimetype="application/json")
上記のコードは仮想の run_inference 関数を呼び出しています。独自の推論関数は、リモートでホストされているかアプリケーションメモリ内でホストされているかに関係なく、モデルから予測を取得できます。
上記のコードの予測エンドポイントはキーを受け取り、そのキーの予測を返します。データには、単一のサンプルを特徴付ける複数のキーがある場合があります。上記のコードは、独自のユースケースに適応できる例です。