Mise en œuvre de l’inférence de modèles en flux continu à l’aide de tables dynamiques dans Snowflake¶
Les tables dynamiques de Snowflake fournissent une couche de transformation continue sur les données en flux. En définissant une table dynamique qui applique les prédictions d’un modèle de machine learning aux données entrantes, vous pouvez maintenir un pipeline d’inférence de modèle automatisé fonctionnant en continu sur vos données sans orchestration manuelle.
Prenons l’exemple d’un flux d’événements de connexion arrivant dans une table (LOGINS_RAW
), contenant des colonnes telles que USER_ID
, LOCATION
, et un horodatage. La table est mise à jour avec les prédictions du modèle concernant le risque de connexion pour les événements récemment arrivés. Seules les nouvelles lignes sont traitées avec les prédictions du modèle.
Note
Si vous utilisez le Snowflake Feature Store, vous pouvez également créer cette table dynamique d’inférence en tant que vue des fonctions à l’aide de l’API du Feature Store. Cela vous permet de charger éventuellement des fonctions au moment de l’inférence à partir d’autres vues de fonctions.
Les sections suivantes montrent comment établir ce pipeline d’inférence continue en utilisant à la fois SQL et Snowpark Python.
Approche SQL¶
Utilisez SQL pour définir une table dynamique qui fait référence au modèle et l’applique aux nouvelles lignes entrantes dans LOGINS_RAW
. Cet exemple utilise une clause WITH
pour référencer le modèle à partir du registre, et la syntaxe !PREDICT
pour exécuter l’inférence :
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Approche de Snowpark Python¶
L’API de Snowpark Python vous permet d’accéder au registre des modèles par programmation et d’exécuter l’inférence directement sur le site DataFrames. Cette approche peut être plus souple et plus facile à maintenir, en particulier dans les environnements axés sur le code.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
L’échantillon de code ci-dessus exécutera automatiquement l’inférence à l’aide de MYMODEL
sur les nouvelles données contenues dans LOGINS_RAW
toutes les 20 minutes.