Snowflakeにおける動的テーブルを用いた連続ストリーミングモデル推論の実装

Snowflakeの動的テーブルは、ストリームデータ上に継続的な変換レイヤーを提供します。機械学習モデルの予測を入力データに適用する動的テーブルを定義することで、手動でオーケストレーションを行うことなく、データに対して自動化され、継続的に実行されるモデル推論パイプラインを維持することができます。

例えば、 USER_IDLOCATION 、タイムスタンプを含む列を持つテーブル(LOGINS_RAW)に到着したログインイベントのストリームを考えてみましょう。表は、最近到着したイベントのログインリスクに関するモデルの予測で更新されます。新しい行だけがモデルの予測で処理されます。

注釈

Snowflake Feature Store を使用している場合は、Feature Store API を使用して、この推論動的テーブルを 特徴ビュー として作成することもできます。これにより、推論時に他のフィーチャー表示からフィーチャーを読み込むことができます。

以下のセクションでは、 SQL とSnowpark Pythonの両方を使用して、この連続推論パイプラインをセットアップする方法を示します。

SQL アプローチ

SQL を使用して、モデルをリファレンスし、 LOGINS_RAW の新しい入力行にそれを適用する動的テーブルを定義します。この例では、 WITH 句を使ってレジストリからモデルをリファレンスし、 !PREDICT 構文を使って推論を実行しています。

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
    l.login_id,
    l.user_id,
    l.location,
    l.event_time,
    my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Copy

Snowpark Pythonのアプローチ

Snowpark Python API では、プログラムでモデルレジストリにアクセスし、 DataFrames 上で直接推論を実行することができます。このアプローチは、特にコード駆動型の環境では、より柔軟で保守性の高いものとなります。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initiliaze="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

上記のコードサンプルは、 LOGINS_RAW の新しいデータに対して MYMODEL を使った推論を20分ごとに自動的に実行します。