Implementing Continuous Streaming Model Inference Using Dynamic Tables in Snowflake

Snowflake’s dynamic tables provide a continuous transformation layer over streaming data. By defining a dynamic table that applies a machine learning model’s predictions to incoming data, you can maintain an automated, continuously running model inference pipeline on your data without manual orchestration.

For example, consider a stream of login events arriving in a table (LOGINS_RAW), containing columns including USER_ID, LOCATION, and a timestamp. The table is updated with the model’s predictions about login risk for recently-arrived events. Only new rows are processed with the model’s predictions.

Note

If you are using the Snowflake Feature Store, you can also create this inference dynamic table as a feature view using the Feature Store API. This allows you to optionally load features at inference time from other feature views.

The following sections demonstrate how to set up this continuous inference pipeline using both SQL and Snowpark Python.

SQL approach

Use SQL to define a dynamic table that references the model and applies it to new incoming rows in LOGINS_RAW. This example uses a WITH clause to reference the model from the registry, and the !PREDICT syntax to run inference:

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
    l.login_id,
    l.user_id,
    l.location,
    l.event_time,
    my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Copy

Snowpark Python approach

The Snowpark Python API allows you to access the model registry programmatically and run inference directly on DataFrames. This approach can be more flexible and maintainable, especially in code-driven environments.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initiliaze="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

The code sample above will run inference using MYMODEL on new data in LOGINS_RAW every 20 minutes automatically.