Implementierung der kontinuierlichen Streaming-Modellinferenz mit dynamischen Tabellen in Snowflake

Die dynamischen Tabellen von Snowflake bieten eine kontinuierliche Transformationsschicht über Streaming-Daten. Durch die Definition einer dynamischen Tabelle, die die Vorhersagen eines Machine Learning-Modells auf die eingehenden Daten anwendet, können Sie eine automatisierte, kontinuierlich laufende Modellinferenz-Pipeline für Ihre Daten ohne manuelle Orchestrierung pflegen.

Nehmen wir zum Beispiel einen Strom von Anmeldeereignissen, die in einer Tabelle (LOGINS_RAW) eintreffen, die Spalten mit USER_ID, LOCATION und einem Zeitstempel enthält. Die Tabelle wird mit den Vorhersagen des Modells zum Anmeldungsrisiko für kürzlich eingetretene Ereignisse aktualisiert. Nur neue Zeilen werden mit den Vorhersagen des Modells verarbeitet.

Bemerkung

Wenn Sie den Snowflake Feature Store verwenden, können Sie diese dynamische Inferenztabelle auch als Feature-Ansicht unter Verwendung des Feature Store-API erstellen. Damit können Sie optional Features zur Inferenzzeit aus anderen Feature-Ansichten laden.

In den folgenden Abschnitten wird gezeigt, wie Sie diese kontinuierliche Inferenzpipeline mit SQL und Snowpark Python einrichten können.

SQL-Ansatz

Verwenden Sie SQL, um eine dynamische Tabelle zu definieren, die das Modell referenziert und es auf neu eingehende Zeilen in LOGINS_RAW anwendet. Dieses Beispiel verwendet eine WITH-Klausel, um das Modell aus der Registry zu referenzieren, und die !PREDICT-Syntax, um die Inferenz auszuführen:

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
    l.login_id,
    l.user_id,
    l.location,
    l.event_time,
    my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Copy

Snowpark Python-Ansatz

Mit der Snowpark Python-API können Sie programmgesteuert auf die Modell-Registry zugreifen und Inferenzen direkt auf DataFrames ausführen. Dieser Ansatz kann flexibler und wartungsfreundlicher sein, insbesondere in codegesteuerten Umgebungen.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initiliaze="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

Das obige Codebeispiel führt automatisch alle 20 Minuten eine Inferenz mit MYMODEL auf neue Daten in LOGINS_RAW durch.