Implementing Continuous Streaming Model Inference Using Dynamic Tables in Snowflake¶
Snowflake’s dynamic tables provide a continuous transformation layer over streaming data. By defining a dynamic table that applies a machine learning model’s predictions to incoming data, you can maintain an automated, continuously running model inference pipeline on your data without manual orchestration.
For example, consider a stream of login events arriving in a table (LOGINS_RAW
), containing columns including
USER_ID
, LOCATION
, and a timestamp. The table is updated with the model’s predictions about login risk for
recently-arrived events. Only new rows are processed with the model’s predictions.
Note
If you are using the Snowflake Feature Store, you can also create this inference dynamic table as a feature view using the Feature Store API. This allows you to optionally load features at inference time from other feature views.
The following sections demonstrate how to set up this continuous inference pipeline using both SQL and Snowpark Python.
SQL approach¶
Use SQL to define a dynamic table that references the model and applies it to new incoming rows
in LOGINS_RAW
. This example uses a WITH
clause to reference the model from the registry, and the
!PREDICT
syntax to run inference:
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Snowpark Python approach¶
The Snowpark Python API allows you to access the model registry programmatically and run inference directly on DataFrames. This approach can be more flexible and maintainable, especially in code-driven environments.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
The code sample above will run inference using MYMODEL
on new data in LOGINS_RAW
every 20 minutes automatically.