Uso do armazenamento de recursos online do Snowflake na produção¶
O Snowflake ML Feature Store ajuda a gerenciar seus recursos durante todo o processo de engenharia de recursos.
Para aplicativos online que exigem inferência de baixa latência, use o armazenamento de recursos online para fornecer seus recursos.
As seções a seguir abordam a colocação em produção do processo de recuperação de recursos no aplicativo Python. As seções apresentam exemplos de código para as seguintes ações:
Carregamento do conjunto de dados Iris no Snowflake
Definição da conexão com o Snowflake
Criação do armazenamento de recursos e das exibições de recursos
Recuperação dos recursos e dos valores de recursos
Geração de previsões com base em seu modelo
Os exemplos de código estão escritos em Python. Para passar por este fluxo de trabalho com aplicativos escritos em outras linguagens, use um driver do Snowflake específico para a linguagem. Para obter mais informações, consulte Drivers.
Pré-requisitos¶
Para executar a recuperação de recursos ML online no Snowflake, você precisa do seguinte:
Dados que você já carregou no Snowflake
Armazenamento de recursos do Snowflake
Exibições de recursos
Fornecimento de recursos online habilitado para cada exibição de recurso
Você pode usar os recursos do seu próprio armazenamento de recursos do Snowflake, mas é possível usar o código a seguir para carregar o conjunto de dados Iris no Snowflake, se ainda não tiver um armazenamento de recursos.
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from snowflake.snowpark.context import get_active_session
sf_session = get_active_session()
### Download the Iris dataset.
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
# rename the columns to fit the Snowflake feature naming requirements
X.rename(columns={
'sepal length (cm)': 'SEPAL_LENGTH_CM',
'sepal width (cm)': 'SEPAL_WIDTH_CM',
'petal length (cm)': 'PETAL_LENGTH_CM',
'petal width (cm)': 'PETAL_WIDTH_CM'
}, inplace=True)
y = iris.target
### Load the data into Snowflake.
X = X.reset_index().rename(columns={"index": "ID"})
sepal_df = sf_session.write_pandas(
X[['ID', 'SEPAL_LENGTH_CM', 'SEPAL_WIDTH_CM']],
table_name="SEPAL_DATA",
auto_create_table=True,
overwrite=True
)
petal_df = sf_session.write_pandas(
X[['ID', 'PETAL_LENGTH_CM', 'PETAL_WIDTH_CM']],
table_name="PETAL_DATA",
auto_create_table=True,
overwrite=True
)
Depois que tiver os dados em seu ambiente, crie o armazenamento de recursos. O código a seguir cria um armazenamento de recursos e a entidade id_entity para as diferentes amostras do conjunto de dados Iris.
### Install Snowflake ML
%pip install snowflake-ml-python==1.18.0
from snowflake.ml.feature_store import (
FeatureStore,
FeatureView,
Entity,
CreationMode,
)
from snowflake.ml.feature_store.feature_view import OnlineConfig
### Create Snowflake feature store
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse=sf_session.get_current_warehouse(),
creation_mode=CreationMode.OR_REPLACE
)
sf_session.use_schema("MY_FEATURE_STORE")
id_entity = Entity(
name='SAMPLE_ID',
join_keys=["ID"],
desc='sample id'
)
feature_store.register_entity(id_entity)
Nota
O Snowflake ML Feature Store tem o conceito de entidades. As entidades são chaves que organizam os recursos entre as exibições. Para obter mais informações sobre entidades, consulte Como trabalhar com entidades.
Depois de criar o armazenamento de recursos, defina as exibições. O código a seguir define as exibições dos recursos sepal e petal do conjunto de dados Iris.
### Create feature views with Online Serving.
sepal_fv = FeatureView(
name='SEPAL_FEATURES',
entities=[id_entity],
feature_df=sepal_df,
desc='Sepal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
petal_fv = FeatureView(
name='PETAL_FEATURES',
entities=[id_entity],
feature_df=petal_df,
desc='Petal features',
refresh_freq='10 minutes',
online_config=OnlineConfig(enable=True)
)
sepal_fv = feature_store.register_feature_view(
sepal_fv, version="v1", overwrite=True)
petal_fv = feature_store.register_feature_view(
petal_fv, version="v1", overwrite=True)
Recuperação dos valores de recursos¶
Depois que você registrar as exibições de recursos e habilitar o fornecimento de recursos online para cada exibição, os valores de recursos de cada exibição serão fornecidos ao seu aplicativo.
Para recuperar os valores de recursos, faça o seguinte:
Configure uma conexão com o Snowflake
Crie a sessão e os objetos do Snowflake Feature Store que são inicializados quando o aplicativo é iniciado
Recupere os recursos das exibições
Crie um ponto de extremidade de previsão e faça as previsões desse ponto de extremidade
Importante
Você deve instalar o snowflake-ml-python>=1.18.0 no ambiente do seu aplicativo para usar a Feature Store API.
Para se conectar de seu aplicativo ao Snowflake, você deve configurar um token de acesso programático (Programmatic Access Token, PAT) ou uma autenticação de par de chaves como método de autenticação.
Configuração do cliente¶
Ao inicializar seu aplicativo, ele deve se conectar à Snowflake ML Feature Store API e criar os objetos Python do Feature Store necessários.
Siga as seções abaixo para configurar a conexão do seu cliente com a Snowflake ML Feature Store API.
Configuração do token de acesso programático (PAT)¶
Especifique os seguintes parâmetros de conexão no código abaixo para conectar o seu aplicativo ao Snowflake:
schema: nome do armazenamento de recursos do Snowflakedatabase: banco de dados que contém o esquema ou o armazenamento de recursosrole: função necessária para ler o armazenamento de recursos. Para obter mais informações, consulte Acesso para criar e fornecer recursos online.password: o PAT
import os
### Define connection parameters using PAT authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"password": pat,
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
Especifique os seguintes parâmetros de conexão no código abaixo para conectar o seu aplicativo ao Snowflake:
schema: nome do armazenamento de recursos do Snowflakedatabase: banco de dados que contém o esquema ou o armazenamento de recursosrole: função necessária para ler o armazenamento de recursos. Para obter mais informações, consulte Criação e fornecimento de recursos online.private_key_file: arquivo de chave privadaprivate_key_file_pwd: senha para o arquivo de chave privada
import os
### Define connection parameters for key-pair authentication.
snowflake_connection_parameters = {
"account": "<account_identifier>",
"user": "<user>",
"private_key_file": "<private key file>",
"private_key_file_pwd": "<private key file pwd>",
"role": "<FS_CONSUMER_ROLE>",
"host": "<host>",
"warehouse": "<warehouse>",
"database": "<database>",
"schema": "MY_FEATURE_STORE",
}
Criação da sessão e dos objetos do Feature Store
Depois de definir os parâmetros de conexão, crie a sessão e os objetos do Feature Store que seu aplicativo usará para se conectar ao Snowflake.
O seguinte código:
Cria a sessão do Snowflake, o cliente que seu aplicativo usa para se comunicar com o Snowflake.
Configura um executor de pool de threads para habilitar o paralelismo de recuperação de recursos.
Lista os recursos que estamos recuperando do armazenamento.
Inicializa o cliente do leitor de armazenamento de recursos. Esse objeto encerra a sessão do Snowflake. Trata-se da principal maneira que seu aplicativo interage com o armazenamento de recursos.
Inicializa as exibições de recursos que você definiu. Você pode substituí-las por seus próprios recursos.
import os
from concurrent.futures import ThreadPoolExecutor
from snowflake.snowpark.session import Session
from snowflake.ml.feature_store import FeatureStore, CreationMode
# 1.Start a Snowflake session
sf_session = Session.builder.configs(snowflake_connection_parameters).create()
# 2. Create a thread pool executor for feature store requests
MAX_WORKERS=os.cpu_count() * 2
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 3. List individual features we are going to retrieve for inference. In this
# example, we are listing Iris features described above in the
# "Prerequisites" section.
PETAL_FEATURE_LIST = ["PETAL_WIDTH_CM", "PETAL_LENGTH_CM"]
SEPAL_FEATURE_LIST = ["SEPAL_WIDTH_CM", "SEPAL_LENGTH_CM"]
# 4. Initialize feature store consumer client
feature_store = FeatureStore(
session=sf_session,
database=sf_session.get_current_database(),
name="MY_FEATURE_STORE",
default_warehouse="<warehouse>",
creation_mode=CreationMode.FAIL_IF_NOT_EXIST
)
# 5. Initialize the feature views
sepal_fv = feature_store.get_feature_view("SEPAL_FEATURES", version="v1")
petal_fv = feature_store.get_feature_view("PETAL_FEATURES", version="v1")
Recuperação dos recursos online no caminho de fornecimento¶
Depois de definir como o aplicativo é inicializado, crie um ponto de extremidade de previsão.
Há diferentes maneiras de definir como seu aplicativo processa as solicitações. O seguinte código Python:
Define o ponto de extremidade de previsão em seu aplicativo
Obtém as chaves da solicitação JSON
Usa as chaves para recuperar os valores de recursos das exibições de recursos
Passa esses valores de recursos para o modelo
Faz as previsões com base no modelo
Retorna as previsões na resposta
from snowflake.ml.feature_store.feature_view import StoreType
import json
import flask
def _retrieve_features(
feature_view: FeatureView,
keys: List[int],
feature_names: List[str]):
"""Retrieve features from the given feature view"""
return feature_store.read_feature_view(
feature_view,
keys=[keys],
feature_names=feature_names,
store_type=StoreType.ONLINE # Query the ONLINE store
).collect()
@app.route("/prediction-endpoint", methods=["POST"])
def prediction():
if flask.request.content_type == 'application/json':
input_data = flask.request.data.decode("utf-8")
input_data = json.loads(data)
else:
return flask.Response(
response="This predictor only supports JSON data",
status=415,
mimetype="text/plain"
)
# Expect that input data is a single key
keys = [int(input_data["key"])]
# Retrieve features from two feature views in parallel.
sepal_features = executor.submit(
_retrieve_features, sepal_fv, keys, SEPAL_FEATURE_LIST)
petal_features = executor.submit(
_retrieve_features, petal_fv, keys, PETAL_FEATURE_LIST)
sepal_features = sepal_features.result()
petal_features = petal_features.result()
predictions = []
if len(sepal_features) != 0 and len(petal_features) != 0:
# Compose the feature vector, excluding the join keys.
feature_vector = (
list(sepal_features[0][1:])
+ list(petal_features[0][1:])
)
# Using a hypothetical run_inference function.
predictions = run_inference(feature_vector)
result = json.dumps({"results": list(predictions)})
return flask.Response(response=result, status=200,
mimetype="application/json")
O código anterior chama uma run_inference hipotética. Sua própria função de inferência pode fazer previsões com base em seu modelo, sem considerar se ele está hospedado remotamente ou na memória do aplicativo.
O ponto de extremidade de previsão no código anterior aceita uma chave e retorna a previsão para essa chave. Seus dados podem ter várias chaves que caracterizam uma única amostra. O código anterior deve ser um exemplo que você pode adaptar ao seu caso de uso.