Snowflake 네이티브 일괄 추론(SQL)

Snowflake 모델 레지스트리를 사용하여 모델에 대한 일괄 추론 호출을 실행합니다. 이러한 일괄 추론 호출을 Snowflake 워크플로에 통합할 수 있습니다. Snowflake 네이티브 일괄 추론을 사용하면 다음을 수행할 수 있습니다.

  • Snowflake SQL, Snowpark Python, Streaming 및 Dynamic Tables 워크플로에 통합

  • 추론 호출 결과와 함께 dbt와 같은 서드 파티 도구 사용

모델을 실행할 위치는 어디인가요?

런타임 환경 선택하기

가상 웨어하우스 또는 SPCS 컴퓨팅 풀에서 모델을 호스팅할 수 있습니다. 다음 정보를 사용하여 모델을 호스팅할 위치를 결정합니다.

런타임

적합한 경우

피해야 할 경우

가상 웨어하우스

• In-Database Batch Inference: Executing models as native SQL functions.
• Zero-Ops Experience: Leveraging existing warehouses without managing compute pools.
• Small Models: CPU-runnable models (e.g., scikit-learn, XGBoost).
• Hardware Constraints: The model requires a GPU for execution.
• Memory Limits: Model size exceeds 15GB. (this limit is lower for smaller warehouse sizes).

Snowpark Container Services(SPCS)

• Large Models: Optimized for LLMs, deep learning models requiring high memory, or models requiring GPUs.
• Custom Environments: For specific pip packages or a custom OS-level environment not found in standard warehouses.
• Organizational Policy: SPCS is not yet approved or enabled in your account.
• Sufficient warehouse compute: If your warehouse can process your batch inference requests, you don’t need the additional compute from SPCS.

웨어하우스에서 모델 실행

웨어하우스에서 모델을 호스팅하려면 target_platforms 인자에 WAREHOUSE를 지정하여 모델을 로깅합니다. 자세한 내용은 종속성 및 대상 플랫폼 관련 작업하기를 참조하세요.

기존 모델을 웨어하우스에서 실행할 수 있는지 여부에 대한 정보를 보려면 SHOW VERSIONS IN MODEL을 실행합니다. runnable_in 열에 값으로 WAREHOUSE가 있는 경우 실행할 수 있습니다.

SPCS에서 모델 실행

SPCS에서 모델을 사용하려면 모델을 서비스로 배포해야 합니다. 모델을 SPCS에 배포하는 방법에 대한 자세한 내용은 온라인 추론을 위한 모델 배포를 참조하세요. 자동 일시 중단이 활성화되어 있는지 확인합니다.

참고

서비스가 일시 중단된 경우 추론 요청이 있을 때 자동으로 재개됩니다. 그러나 지정된 기간 내에 서비스 재개에 실패하면 쿼리가 실패할 수 있습니다. 컴퓨팅 풀에 사용 가능한 노드가 부족한 경우 쿼리가 재개되지 않을 수 있습니다. 이러한 위험을 완화하기 위해 서비스를 명시적으로 재개하고 사용 가능해질 때까지 기다릴 수 있습니다.

XSMALL 또는 SMALL 웨어하우스를 사용하여 추론 요청을 SPCS 컴퓨팅 풀로 라우팅합니다. 웨어하우스는 노드당 여러 스레드를 실행하고 각 요청과 함께 많은 수의 추론 요청을 보낼 수 있습니다. 결과적으로, SPCS 내에서 작동하는 서비스는 쉽게 압도될 수 있습니다. 따라서 모델이 SPCS에 배포될 때 XSMALL 또는 SMALL 웨어하우스를 활용할 것을 권장합니다.

Python에서의 추론

Snowflake Python API를 사용하는 경우 추론을 요청하려면 snowflake-ml-python 패키지가 있어야 합니다.

모델 레지스트리에 연결

모델 레지스트리에서 추론 요청에 사용 중인 모델을 검색합니다. 다음 코드를 사용하여 모델을 검색합니다.

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

일괄 추론 작업 실행

모델 버전 오브젝트의 run 메서드를 사용하여 일괄 추론 작업을 실행합니다. run 메서드를 사용하여 다음을 수행할 수 있습니다.

  • 웨어하우스 또는 SPCS 컴퓨팅 풀에서 추론 작업을 실행합니다.

  • 추론 데이터와 함께 Snowpark 또는 pandas 데이터프레임을 제공합니다.

run 메서드는 지정한 데이터 프레임의 유형과 일치하는 데이터 프레임을 반환합니다. 예를 들어, pandas 데이터프레임을 입력으로 지정하면 pandas 데이터프레임이 출력으로 제공됩니다.

참고

Snowpark DataFrames는 지연 평가를 받습니다. 실행은 DataFrame의 collect, show 또는 to_pandas 메서드를 통해서만 발생합니다.

다음 예제에서는 웨어하우스에서 일괄 추론 작업을 실행합니다.

# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict")
remote_prediction.show()   # assuming test_features is Snowpark DataFrame
Copy

모델에서 호출할 수 있는 메서드를 보려면 mv.show_functions를 실행하세요. 이 메서드의 반환 값은 ModelFunctionInfo 오브젝트의 목록입니다. 이러한 각 오브젝트에는 다음 속성이 포함됩니다.

  • 이름: Python 또는 SQL에서 호출할 수 있는 함수의 이름입니다.

  • target_method: 원래 로깅된 모델의 Python 메서드 이름입니다.

# Get signature of the inference function in Python
# mv: snowflake.ml.model.ModelVersion
mv.show_functions()
Copy

실행 메서드 내에서 고유한 입력 기능을 지정하고 function_name을 “예측”으로 설정합니다.

다음 예제에서는 SPCS에서 일괄 추론 작업을 실행합니다.

# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(test_features, function_name="predict", service_name="example_spcs_service")
remote_prediction.show()   # assuming test_features is Snowpark DataFrame
Copy

실행 메서드 내에서 고유한 입력 기능을 지정하고, function_name을 “예측”으로 설정하고, service_name을 SPCS 서비스의 이름으로 설정합니다.

remote_prediction.show()는 출력 데이터 프레임을 보여줍니다.

메서드는 레지스트리에 연결하는 데 사용하는 세션에 지정된 Snowflake 웨어하우스에서 실행됩니다. 웨어하우스 지정하기를 살펴보세요.

SQL에서의 추론

모델 버전의 사용 가능한 함수와 서명을 이해하려면 다음 명령을 사용합니다.

SHOW FUNCTION IN MODEL mymodel VERSION myversion;
Copy

웨어하우스에서 모델 실행

MODEL(model_name)!method_name(…) 구문을 사용하여 모델의 메서드를 호출합니다. 모델에서 사용할 수 있는 메서드는 기본 Python 모델 클래스에 따라 결정됩니다. 예를 들어, 많은 유형의 모델이 추론에 예측이라는 메서드를 사용합니다.

기본 모델의 메서드를 호출하려면 다음 구문을 사용합니다. 괄호 안에 메서드 인자를 포함시키고 FROM 절에 추론 데이터가 포함된 테이블을 지정합니다.

SELECT MODEL(<model_name>)!<method_name>(...) FROM <table_name>;
Copy

모델의 특정 버전에서 메서드를 호출하려면 모델의 특정 버전에 대한 별칭을 만들고 그 별칭을 통해 메서드를 호출합니다.

특정 버전의 모델에서 메서드를 호출하려면 다음 구문을 사용합니다.

SELECT MODEL(<model_name>,<version_or_alias_name>)!<method_name>(...) FROM <table_name>;
Copy

다음 예에서는 LAST 별칭을 사용하여 최신 버전의 모델을 호출합니다.

SELECT MODEL(my_model,LAST)!predict(...) FROM my_table;
Copy

SPCS에서 모델 실행

웨어하우스에서의 실행과 달리, 함수는 service_name!method_name(…)을 호출하여 서비스에서 호출할 수 있습니다.

SELECT <mservice_name>!<method_name>(...) FROM <table_name>;
Copy

동적 테이블을 사용한 연속 모델 추론

Snowflake의 동적 테이블은 스트리밍 데이터 위에 연속 변환 계층을 설정합니다. 머신 러닝 모델의 예측을 수신 데이터에 적용하는 동적 테이블을 정의하면 수동 오케스트레이션 요구 사항 없이 데이터에 대해 자동화되고 지속적으로 작동하는 모델 추론 파이프라인을 유지할 수 있습니다.

예를 들어, USER_ID, LOCATION 및 타임스탬프와 같은 열을 포함하는 테이블을 채우는 로그인 이벤트 스트림(LOGINS_RAW)을 고려해 보겠습니다. 이 테이블은 이후에 새로 도착한 이벤트의 로그인 위험에 대한 모델의 예측으로 업데이트됩니다. 결정적으로 새 행만 모델의 예측으로 처리됩니다.

SQL

동적 테이블은 Snowflake 사용자가 수신 데이터에 대한 증분 추론을 수행할 수 있는 강력한 기능을 제공합니다. SQL을 사용하여 모델을 참조하는 동적 테이블을 정의하고 LOGINS_RAW에서 새로 들어오는 행에 적용합니다.

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
SELECT
    login_id,
    user_id,
    location,
    event_time,
    MODEL(ml.registry.mymodel)!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw;
Copy

Snowpark Python

Snowpark Python API 를 사용하여 프로그래밍 방식으로 Model Registry에 액세스하고 DataFrames 에서 직접 추론을 실행할 수 있습니다. 이 접근 방식은 특히 코드 중심 환경에서 더 유연하고 유지 관리가 용이할 수 있습니다.

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initialize="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

위의 코드 샘플은 MYMODEL 의 새 데이터에 대해 LOGINS_RAW 을 사용하여 추론을 20분마다 자동으로 실행합니다.

불변 및 휘발성

이러한 증분성은 필수적이며 동적 테이블 정의 내에서 호출된 모든 함수를 IMMUTABLE로 지정해야 합니다. 표준 모델 내의 함수는 일반적으로 IMMUTABLE이며, 사용자 지정 모델의 기본값은 VOLATILE입니다. 기본 모델이 불변으로 알려진 경우, 모델이 로깅될 때 해당 모델 함수가 변경 불가능한 것으로 명시적으로 표시되도록 하는 것이 중요합니다.