프로덕션에서 Snowflake 온라인 기능 스토어 사용하기¶
Snowflake ML 기능 스토어는 기능 엔지니어링 프로세스 전체에서 기능을 관리하는 데 도움이 됩니다.
짧은 대기 시간 추론이 필요한 온라인 애플리케이션의 경우 온라인 기능 스토어를 사용하여 기능을 제공합니다.
다음 섹션에서는 Python 애플리케이션 내에서 기능을 검색하는 프로세스를 프로덕션화하는 과정을 살펴봅니다. 이러한 섹션에는 다음을 수행하는 코드 예제가 있습니다.
Snowflake에 Iris 데이터 세트 로드하기
Snowflake와 연결 정의하기
기능 스토어 및 기능 뷰 만들기
특징 및 특징 값 검색하기
모델에서 예측 생성하기
코드 예제는 Python으로 작성되었습니다. 다른 언어로 작성된 애플리케이션에 대해 이 워크플로를 진행하려면 해당 언어에 고유한 Snowflake 드라이버를 사용하세요. 자세한 내용은 드라이버 섹션을 참조하십시오.
전제 조건¶
Snowflake에서 온라인 ML 기능 검색을 실행하려면 다음이 필요합니다.
Snowflake에 이미 로드한 데이터
Snowflake 기능 스토어
기능 뷰
각 기능 뷰에 대해 활성화된 온라인 기능 제공
자체 Snowflake 기능 스토어의 기능을 사용할 수 있지만, 아직 기능 스토어가 없는 경우 다음 코드를 사용하여 Iris 데이터 세트를 Snowflake에 로드할 수 있습니다.
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from snowflake.snowpark.context import get_active_session
sf_session = get_active_session()
### Download the Iris dataset.
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
'sepal length (cm)': 'SEPAL_LENGTH_CM',
'sepal width (cm)': 'SEPAL_WIDTH_CM',
'petal length (cm)': 'PETAL_LENGTH_CM',
'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target
### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
table_name="SEPAL_DATA",
auto_create_table=True,
overwrite=True
)
petal_df = sf_session.write_pandas(
X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
table_name="PETAL_DATA",
auto_create_table=True,
overwrite=True
)
해당 환경에 데이터가 있으면 기능 스토어를 생성합니다. 다음 코드는 Iris 데이터 세트의 다양한 샘플에 대한 기능 스토어 및 id_entity 엔터티를 생성합니다.
### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0
from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
Entity,
CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig
### Create Snowflake feature store
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse=sf_session.get_current_warehouse(),
creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")
id_entity = Entity(
name='SAMPLE_ID',
join_keys=["ID"],
desc='sample id'
)
feature_store.register_entity(id_entity)
참고
Snowflake ML 기능 스토어에는 엔터티의 개념이 있습니다. 엔터티는 기능 뷰 사이에서 기능을 구성하는 키입니다. 엔터티에 대한 자세한 내용은 엔터티 작업하기 섹션을 참조하세요.
기능 스토어를 생성한 후 기능 뷰를 정의합니다. 다음 코드는 Iris 데이터 세트의 sepal 및 petal 기능 뷰를 정의합니다.
### Create feature views with Online Serving.
sepal_fv = FeatureView(
name='SEPAL_FEATURES',
entities=[id_entity],
feature_df=sepal_df,
desc='Sepal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
name='PETAL_FEATURES',
entities=[id_entity],
feature_df=petal_df,
desc='Petal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
petal_fv, version="v1", overwrite=True)
기능 값 검색하기¶
기능 뷰를 등록하고 각 기능 뷰에 대해 온라인 기능 제공을 활성화한 후에는 각 기능 뷰의 기능 값을 애플리케이션에 제공할 수 있습니다.
기능 값을 검색하려면 다음을 수행합니다.
Snowflake에 연결 설정
애플리케이션이 시작될 때 초기화되는 세션 및 Snowflake 기능 스토어 오브젝트 만들기
기능 뷰에서 기능 검색하기
예측 엔드포인트를 생성하고 해당 엔드포인트에서 예측 가져오기
중요
기능 스토어 API를 사용하려면 애플리케이션 환경에 :code:`snowflake-ml-python>=1.18.0`을 설치해야 합니다.
애플리케이션에서 Snowflake에 연결하려면 인증 방법으로 프로그래밍 방식 액세스 토큰(PAT) 또는 키 페어 인증 중 하나를 설정해야 합니다.
클라이언트 구성¶
애플리케이션을 초기화할 때 Snowflake ML 기능 스토어 API에 연결하고 필수 기능 스토어 Python 오브젝트를 생성해야 합니다.
다음 섹션을 사용하여 Snowflake ML 기능 스토어 API에 대한 클라이언트의 연결을 구성합니다.
프로그래밍 방식 액세스 토큰(PAT) 생성하기¶
다음 코드에서 다음 연결 매개 변수를 지정하여 애플리케이션에서 Snowflake에 연결합니다.
schema- Snowflake 기능 스토어의 이름database- 스키마 또는 기능 스토어가 포함된 데이터베이스role- 기능 스토어에서 읽는 데 필요한 역할. 자세한 내용은 온라인 기능을 생성 및 제공할 수 있는 액세스 권한 제공 섹션을 참조하십시오.password- 사용자의 PAT.
import os
### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"password": pat,
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
다음 코드에서 다음 연결 매개 변수를 지정하여 애플리케이션에서 Snowflake에 연결합니다.
schema- Snowflake 기능 스토어의 이름database- 스키마 또는 기능 스토어가 포함된 데이터베이스role- 기능 스토어에서 읽는 데 필요한 역할. 자세한 내용은 온라인 기능 생성 및 제공하기 섹션을 참조하십시오.private_key_file- 개인 키 파일private_key_file_pwd- 개인 키 파일의 비밀번호
import os
### Define connection parameters for key-pair authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"private_key_file": "<private key file>",
"private_key_file_pwd": "<private key file pwd>",
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
세션 및 기능 스토어 오브젝트 만들기
연결 매개 변수를 정의한 후 애플리케이션이 Snowflake에 연결하는 데 사용하는 세션 및 기능 스토어 오브젝트를 생성합니다.
다음 코드는 다음을 수행합니다.
애플리케이션이 Snowflake와 통신하는 데 사용하는 클라이언트인 Snowflake Session을 생성합니다.
기능 검색 병렬 처리를 활성화하도록 스레드 풀 실행기를 구성합니다.
기능 스토어에서 검색 중인 기능을 나열합니다.
기능 스토어 리더 클라이언트를 초기화합니다. 이 오브젝트는 Snowflake 세션을 래핑합니다. 애플리케이션이 기능 스토어와 상호 작용하는 주요 방법입니다.
정의한 기능 뷰를 초기화합니다. 이러한 기능을 자체 기능으로 바꿀 수 있습니다.
import os
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode
# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()
# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 3. List individual features we are going to retrieve for inference. In this
# example, we are listing Iris features described above in the
# "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]
# 4. Initialize feature store consumer client
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse="<warehouse>",
creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)
# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
검색 경로에서 온라인 기능 검색하기¶
애플리케이션이 초기화되는 방법을 정의한 후에는 예측 엔드포인트를 생성할 수 있습니다.
애플리케이션이 요청을 처리하는 방법을 정의할 수 있는 다양한 방법이 있습니다. 다음 Python 코드는 다음을 수행합니다.
애플리케이션에서 예측 엔드포인트 정의
JSON 요청에서 키 가져오기
키를 사용하여 기능 뷰에서 기능 값 검색
해당 기능 값을 모델에 전달
모델에서 예측 가져오기
응답에서 예측 반환
from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask
def _retrieve_features(
feature_view: FeatureView,
keys: List[int],
feature_names: List[str]):
"""Retrieve features from the given feature view"""
return feature_store.read_feature_view(
feature_view,
keys=[keys],
feature_names=feature_names,
store_type=StoreType.ONLINE # Query the ONLINE store
).collect()
@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
if flask.request.content_type == 'application/json':
input_data = flask.request.data.decode("utf-8")
input_data = json.loads(data)
else:
return flask.Response(
response="This predictor only supports JSON data",
status=415,
mimetype="text/plain"
)
# Expect that input data is a single key
keys = [int(input_data["key"])]
# Retrieve features from two feature views in parallel.
sepal_features = executor.submit(
_retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
petal_features = executor.submit(
_retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)
sepal_features = sepal_features.result()
petal_features = petal_features.result()
predictions = []
if len(sepal_features) != 0 and len(petal_features) != 0:
# Compose the feature vector, excluding the join keys.
feature_vector = (
list(sepal_features[0][1:])
+ list(petal_features[0][1:])
)
# Using a hypothetical run_inference function.
predictions = run_inference(feature_vector)
result = json.dumps({"results": list(predictions)})
return flask.Response(response=result, status=200,
mimetype="application/json")
이전 코드는 가상 run_inference 함수를 호출합니다. 자체 추론 함수는 원격으로 호스팅되는지 애플리케이션 메모리에 호스팅되는지 여부에 관계없이 모델에서 예측을 가져올 수 있습니다.
이전 코드의 예측 엔드포인트는 키를 허용하고 해당 키에 대한 예측을 반환합니다. 데이터에는 단일 샘플을 특성화하는 여러 키가 있을 수 있습니다. 이전 코드는 사용자의 사용 사례에 맞게 조정할 수 있는 예제입니다.