Mise en œuvre de l’inférence de modèles en flux continu à l’aide de tables dynamiques dans Snowflake

Les tables dynamiques de Snowflake fournissent une couche de transformation continue sur les données en flux. En définissant une table dynamique qui applique les prédictions d’un modèle de machine learning aux données entrantes, vous pouvez maintenir un pipeline d’inférence de modèle automatisé fonctionnant en continu sur vos données sans orchestration manuelle.

Prenons l’exemple d’un flux d’événements de connexion arrivant dans une table (LOGINS_RAW), contenant des colonnes telles que USER_ID, LOCATION, et un horodatage. La table est mise à jour avec les prédictions du modèle concernant le risque de connexion pour les événements récemment arrivés. Seules les nouvelles lignes sont traitées avec les prédictions du modèle.

Note

Si vous utilisez le Snowflake Feature Store, vous pouvez également créer cette table dynamique d’inférence en tant que vue des fonctions à l’aide de l’API du Feature Store. Cela vous permet de charger éventuellement des fonctions au moment de l’inférence à partir d’autres vues de fonctions.

Les sections suivantes montrent comment établir ce pipeline d’inférence continue en utilisant à la fois SQL et Snowpark Python.

Approche SQL

Utilisez SQL pour définir une table dynamique qui fait référence au modèle et l’applique aux nouvelles lignes entrantes dans LOGINS_RAW. Cet exemple utilise une clause WITH pour référencer le modèle à partir du registre, et la syntaxe !PREDICT pour exécuter l’inférence :

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
    l.login_id,
    l.user_id,
    l.location,
    l.event_time,
    my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Copy

Approche de Snowpark Python

L’API de Snowpark Python vous permet d’accéder au registre des modèles par programmation et d’exécuter l’inférence directement sur le site DataFrames. Cette approche peut être plus souple et plus facile à maintenir, en particulier dans les environnements axés sur le code.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initiliaze="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

L’échantillon de code ci-dessus exécutera automatiquement l’inférence à l’aide de MYMODEL sur les nouvelles données contenues dans LOGINS_RAW toutes les 20 minutes.