Snowflake에서 동적 테이블을 사용하여 연속 스트리밍 모델 추론 구현하기¶
Snowflake의 동적 테이블은 스트리밍 데이터에 대한 지속적인 변환 계층을 제공합니다. 들어오는 데이터에 머신 러닝 모델의 예측을 적용하는 동적 테이블을 정의하면 수동 오케스트레이션 없이도 데이터에서 자동화된 모델 추론 파이프라인을 지속적으로 실행할 수 있습니다.
예를 들어 USER_ID
, LOCATION
, 타임스탬프 등의 열이 포함된 테이블(LOGINS_RAW
)에 도착하는 로그인 이벤트 스트림이 있다고 가정해 보겠습니다. 테이블은 최근에 도착한 이벤트의 로그인 위험에 대한 모델의 예측으로 업데이트됩니다. 새 행만 모델의 예측으로 처리됩니다.
참고
Snowflake Feature Store 를 사용하는 경우 Feature Store API 를 사용하여 이 추론 동적 테이블을 특성 뷰 로 만들 수도 있습니다. 이를 통해 추론 시점에 다른 특성 뷰에서 선택적으로 특성을 로딩할 수 있습니다.
다음 섹션에서는 SQL 및 Snowpark Python을 모두 사용하여 이 연속 추론 파이프라인을 설정하는 방법을 설명합니다.
SQL 접근 방식¶
SQL 을 사용하여 모델을 참조하는 동적 테이블을 정의하고 LOGINS_RAW
에서 새로 들어오는 행에 적용합니다. 이 예에서는 WITH
절을 사용하여 레지스트리에서 모델을 참조하고 !PREDICT
구문을 사용하여 추론을 실행합니다.
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Snowpark Python 접근 방식¶
Snowpark Python API 를 사용하여 프로그래밍 방식으로 Model Registry에 액세스하고 DataFrames 에서 직접 추론을 실행할 수 있습니다. 이 접근 방식은 특히 코드 중심 환경에서 더 유연하고 유지 관리가 용이할 수 있습니다.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
위의 코드 샘플은 LOGINS_RAW
의 새 데이터에 대해 MYMODEL
을 사용하여 추론을 20분마다 자동으로 실행합니다.