Implementação de inferência de modelo de fluxo contínuo usando tabelas dinâmicas no Snowflake¶
As tabelas dinâmicas do Snowflake fornecem uma camada de transformação contínua sobre os dados de streaming. Ao definir uma tabela dinâmica que aplica as previsões de um modelo de aprendizado de máquina aos dados recebidos, você pode manter um pipeline de inferência de modelo automatizado e em execução contínua em seus dados sem orquestração manual.
Por exemplo, considere um fluxo de eventos de login que chegam em uma tabela (LOGINS_RAW
), contendo colunas como USER_ID
, LOCATION
e um carimbo de data/hora. A tabela é atualizada com as previsões do modelo sobre o risco de login para eventos recém-chegados. Somente as novas linhas são processadas com as previsões do modelo.
Nota
Se você estiver usando o Snowflake Feature Store, também poderá criar essa tabela dinâmica de inferência como uma visualização de recurso usando a Feature Store API. Isso permite que você carregue opcionalmente recursos no momento da inferência a partir de outras exibições de recursos.
As seções a seguir demonstram como configurar esse pipeline de inferência contínua usando o SQL e o Snowpark Python.
Abordagem de SQL¶
Use SQL para definir uma tabela dinâmica que faça referência ao modelo e o aplique às novas linhas recebidas em LOGINS_RAW
. Este exemplo usa uma cláusula WITH
para fazer referência ao modelo do registro e a sintaxe !PREDICT
para executar a inferência:
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Abordagem do Snowpark Python¶
A Snowpark Python API permite acessar o registro do modelo de forma programática e executar a inferência diretamente em DataFrames. Essa abordagem pode ser mais flexível e passível de manutenção, especialmente em ambientes orientados por código.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
O exemplo de código acima executará a inferência usando MYMODEL
em novos dados em LOGINS_RAW
a cada 20 minutos automaticamente.