프로덕션에서 Snowflake 온라인 기능 스토어 사용하기

Snowflake ML 기능 스토어는 기능 엔지니어링 프로세스 전체에서 기능을 관리하는 데 도움이 됩니다.

짧은 대기 시간 추론이 필요한 온라인 애플리케이션의 경우 온라인 기능 스토어를 사용하여 기능을 제공합니다.

다음 섹션에서는 Python 애플리케이션 내에서 기능을 검색하는 프로세스를 프로덕션화하는 과정을 살펴봅니다. 이러한 섹션에는 다음을 수행하는 코드 예제가 있습니다.

  1. Snowflake에 Iris 데이터 세트 로드하기

  2. Snowflake와 연결 정의하기

  3. 기능 스토어 및 기능 뷰 만들기

  4. 특징 및 특징 값 검색하기

  5. 모델에서 예측 생성하기

코드 예제는 Python으로 작성되었습니다. 다른 언어로 작성된 애플리케이션에 대해 이 워크플로를 진행하려면 해당 언어에 고유한 Snowflake 드라이버를 사용하세요. 자세한 내용은 드라이버 섹션을 참조하십시오.

전제 조건

Snowflake에서 온라인 ML 기능 검색을 실행하려면 다음이 필요합니다.

  • Snowflake에 이미 로드한 데이터

  • Snowflake 기능 스토어

  • 기능 뷰

  • 각 기능 뷰에 대해 활성화된 온라인 기능 제공

자체 Snowflake 기능 스토어의 기능을 사용할 수 있지만, 아직 기능 스토어가 없는 경우 다음 코드를 사용하여 Iris 데이터 세트를 Snowflake에 로드할 수 있습니다.

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

from snowflake.snowpark.context import get_active_session

sf_session = get_active_session()

### Download the Iris dataset.

iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
    'sepal length (cm)': 'SEPAL_LENGTH_CM',
    'sepal width (cm)': 'SEPAL_WIDTH_CM',
    'petal length (cm)': 'PETAL_LENGTH_CM',
    'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target

### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
    X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
    table_name="SEPAL_DATA",
    auto_create_table=True,
    overwrite=True
)
petal_df = sf_session.write_pandas(
    X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
    table_name="PETAL_DATA",
    auto_create_table=True,
    overwrite=True
)
Copy

해당 환경에 데이터가 있으면 기능 스토어를 생성합니다. 다음 코드는 Iris 데이터 세트의 다양한 샘플에 대한 기능 스토어 및 id_entity 엔터티를 생성합니다.

### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0

from snowflake.ml.feature_store import (
    FeatureStore,
    FeatureView,
    Entity,
    CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig

### Create Snowflake feature store

feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse=sf_session.get_current_warehouse(),
    creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")

id_entity = Entity(
    name='SAMPLE_ID',
    join_keys=["ID"],
    desc='sample id'
)
feature_store.register_entity(id_entity)
Copy

참고

Snowflake ML 기능 스토어에는 엔터티의 개념이 있습니다. 엔터티는 기능 뷰 사이에서 기능을 구성하는 키입니다. 엔터티에 대한 자세한 내용은 엔터티 작업하기 섹션을 참조하세요.

기능 스토어를 생성한 후 기능 뷰를 정의합니다. 다음 코드는 Iris 데이터 세트의 sepal 및 petal 기능 뷰를 정의합니다.

### Create feature views with Online Serving.
sepal_fv = FeatureView(
    name='SEPAL_FEATURES',
    entities=[id_entity],
    feature_df=sepal_df,
    desc='Sepal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
    name='PETAL_FEATURES',
    entities=[id_entity],
    feature_df=petal_df,
    desc='Petal features',
    refresh_freq='10 minutes',
    online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
    sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
    petal_fv, version="v1", overwrite=True)
Copy

기능 값 검색하기

기능 뷰를 등록하고 각 기능 뷰에 대해 온라인 기능 제공을 활성화한 후에는 각 기능 뷰의 기능 값을 애플리케이션에 제공할 수 있습니다.

기능 값을 검색하려면 다음을 수행합니다.

  1. Snowflake에 연결 설정

  2. 애플리케이션이 시작될 때 초기화되는 세션 및 Snowflake 기능 스토어 오브젝트 만들기

  3. 기능 뷰에서 기능 검색하기

  4. 예측 엔드포인트를 생성하고 해당 엔드포인트에서 예측 가져오기

중요

기능 스토어 API를 사용하려면 애플리케이션 환경에 :code:`snowflake-ml-python>=1.18.0`을 설치해야 합니다.

애플리케이션에서 Snowflake에 연결하려면 인증 방법으로 프로그래밍 방식 액세스 토큰(PAT) 또는 키 페어 인증 중 하나를 설정해야 합니다.

클라이언트 구성

애플리케이션을 초기화할 때 Snowflake ML 기능 스토어 API에 연결하고 필수 기능 스토어 Python 오브젝트를 생성해야 합니다.

다음 섹션을 사용하여 Snowflake ML 기능 스토어 API에 대한 클라이언트의 연결을 구성합니다.

프로그래밍 방식 액세스 토큰(PAT) 생성하기

다음 코드에서 다음 연결 매개 변수를 지정하여 애플리케이션에서 Snowflake에 연결합니다.

import os

### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
    "account": "<account_identifier>",
    "user": "<user>",
    "password": pat,
    "role": "<FS_CONSUMER_ROLE>",
    "host": "<host>",
    "warehouse": "<warehouse>",
    "database": "<database>",
    "schema": "MY_FEATURE_STORE",
}
Copy

세션 및 기능 스토어 오브젝트 만들기

연결 매개 변수를 정의한 후 애플리케이션이 Snowflake에 연결하는 데 사용하는 세션 및 기능 스토어 오브젝트를 생성합니다.

다음 코드는 다음을 수행합니다.

  1. 애플리케이션이 Snowflake와 통신하는 데 사용하는 클라이언트인 Snowflake Session을 생성합니다.

  2. 기능 검색 병렬 처리를 활성화하도록 스레드 풀 실행기를 구성합니다.

  3. 기능 스토어에서 검색 중인 기능을 나열합니다.

  4. 기능 스토어 리더 클라이언트를 초기화합니다. 이 오브젝트는 Snowflake 세션을 래핑합니다. 애플리케이션이 기능 스토어와 상호 작용하는 주요 방법입니다.

  5. 정의한 기능 뷰를 초기화합니다. 이러한 기능을 자체 기능으로 바꿀 수 있습니다.

import os
from concurrent.futures import ThreadPoolExecutor

from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()

# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

# 3. List individual features we are going to retrieve for inference. In this
#    example, we are listing Iris features described above in the
#    "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]

# 4. Initialize feature store consumer client
feature_store = FeatureStore(
    session=sf_session,
    database=sf_session.get_current_database(),
    name="MY_FEATURE_STORE",
    default_warehouse="<warehouse>",
    creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)

# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Copy

검색 경로에서 온라인 기능 검색하기

애플리케이션이 초기화되는 방법을 정의한 후에는 예측 엔드포인트를 생성할 수 있습니다.

애플리케이션이 요청을 처리하는 방법을 정의할 수 있는 다양한 방법이 있습니다. 다음 Python 코드는 다음을 수행합니다.

  • 애플리케이션에서 예측 엔드포인트 정의

  • JSON 요청에서 키 가져오기

  • 키를 사용하여 기능 뷰에서 기능 값 검색

  • 해당 기능 값을 모델에 전달

  • 모델에서 예측 가져오기

  • 응답에서 예측 반환

from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask

def _retrieve_features(
    feature_view: FeatureView,
    keys: List[int],
    feature_names: List[str]):
    """Retrieve features from the given feature view"""

    return feature_store.read_feature_view(
        feature_view,
        keys=[keys],
        feature_names=feature_names,
        store_type=StoreType.ONLINE  # Query the ONLINE store
    ).collect()

@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
    if flask.request.content_type == 'application/json':
        input_data = flask.request.data.decode("utf-8")
        input_data = json.loads(data)
    else:
        return flask.Response(
            response="This predictor only supports JSON data",
            status=415,
            mimetype="text/plain"
        )

    # Expect that input data is a single key
    keys = [int(input_data["key"])]

    # Retrieve features from two feature views in parallel.
    sepal_features = executor.submit(
        _retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
    petal_features = executor.submit(
        _retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)

    sepal_features = sepal_features.result()
    petal_features = petal_features.result()

    predictions = []
    if len(sepal_features) != 0 and len(petal_features) != 0:
        # Compose the feature vector, excluding the join keys.
        feature_vector = (
            list(sepal_features[0][1:])
            + list(petal_features[0][1:])
        )

        # Using a hypothetical run_inference function.
        predictions = run_inference(feature_vector)

    result = json.dumps({"results": list(predictions)})
    return flask.Response(response=result, status=200,
        mimetype="application/json")
Copy

이전 코드는 가상 run_inference 함수를 호출합니다. 자체 추론 함수는 원격으로 호스팅되는지 애플리케이션 메모리에 호스팅되는지 여부에 관계없이 모델에서 예측을 가져올 수 있습니다.

이전 코드의 예측 엔드포인트는 키를 허용하고 해당 키에 대한 예측을 반환합니다. 데이터에는 단일 샘플을 특성화하는 여러 키가 있을 수 있습니다. 이전 코드는 사용자의 사용 사례에 맞게 조정할 수 있는 예제입니다.