Snowflakeにおける動的テーブルを用いた連続ストリーミングモデル推論の実装¶
Snowflakeの動的テーブルは、ストリームデータ上に継続的な変換レイヤーを提供します。機械学習モデルの予測を入力データに適用する動的テーブルを定義することで、手動でオーケストレーションを行うことなく、データに対して自動化され、継続的に実行されるモデル推論パイプラインを維持することができます。
例えば、 USER_ID
、 LOCATION
、タイムスタンプを含む列を持つテーブル(LOGINS_RAW
)に到着したログインイベントのストリームを考えてみましょう。表は、最近到着したイベントのログインリスクに関するモデルの予測で更新されます。新しい行だけがモデルの予測で処理されます。
注釈
Snowflake Feature Store を使用している場合は、Feature Store API を使用して、この推論動的テーブルを 特徴ビュー として作成することもできます。これにより、推論時に他のフィーチャー表示からフィーチャーを読み込むことができます。
以下のセクションでは、 SQL とSnowpark Pythonの両方を使用して、この連続推論パイプラインをセットアップする方法を示します。
SQL アプローチ¶
SQL を使用して、モデルをリファレンスし、 LOGINS_RAW
の新しい入力行にそれを適用する動的テーブルを定義します。この例では、 WITH
句を使ってレジストリからモデルをリファレンスし、 !PREDICT
構文を使って推論を実行しています。
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Snowpark Pythonのアプローチ¶
Snowpark Python API では、プログラムでモデルレジストリにアクセスし、 DataFrames 上で直接推論を実行することができます。このアプローチは、特にコード駆動型の環境では、より柔軟で保守性の高いものとなります。
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
上記のコードサンプルは、 LOGINS_RAW
の新しいデータに対して MYMODEL
を使った推論を20分ごとに自動的に実行します。