Snowflakeネイティブバッチ推論( SQL )

Snowflakeモデルレジストリを使用して、モデルへのバッチ推論呼び出しを実行します。これらのバッチ推論呼び出しをSnowflakeワークフローに統合できます。Snowflakeネイティブバッチ推論を使用すると、次を実行できます。

  • Snowflake SQL 、Snowpark Python、ストリーミング、動的テーブルのワークフローに統合します。

  • 推論呼び出しの結果でdbtのようなサードパーティのツールを使用する

モデルを実行する場所

ランタイム環境の選択

ウェアハウスまたは SPCS コンピューティングプール上でモデルをホストすることができます。以下の情報を使って、モデルをホストしている場所を判別します。

ランタイム

最適な用途

次の場合は回避

仮想ウェアハウス

•データベース内のバッチ推論:ネイティブ SQL 関数としてのモデルの実行。
•ゼロ操作のエクスペリエンス:コンピューティングプールを管理することなく、既存のウェアハウスを活用します。
•小規模モデル: CPU 実行可能モデル(例:scikit-learn、 XGBoost )。
•ハードウェアの制約:モデルには実行用の GPU が必要です。
•メモリ制限:モデルサイズが 15GB を超えています(この制限は、ウェアハウスサイズが小さい場合に低くなります)。

Snowpark Container Services(SPCS)

•大規模モデル:LLMs 向けに最適化された、高メモリを必要とするディープラーニングモデル、または GPUs を必要とするモデル。
•カスタム環境:特定のpipパッケージまたは標準のウェアハウスにはないカスタム OS レベルの環境の場合。
•組織ポリシー: SPCS はまだ承認されていないか、アカウントで有効化されていません。
•十分なウェアハウスコンピューティング:ウェアハウスがバッチ推論リクエストを処理できるのであれば、 SPCS からの追加コンピューティングは必要ありません。

ウェアハウスでモデルを実行するには

ウェアハウスでモデルをホストするには、target_platforms引数内の WAREHOUSE を指定して、モデルをログに記録します。詳しくは、依存関係およびターゲットプラットフォームの操作を参照してください。

既存のモデルがウェアハウスで実行可能かどうかについては、 SHOW VERSIONS IN MODEL を実行してください。runnable_in列に値として WAREHOUSE がある場合は実行できます。

SPCS でモデルを実行するには

SPCS でモデルを使用するには、モデルをサービスとしてデプロイする必要があります。SPCS へのモデルのデプロイについて詳しくは、オンライン推論のためのモデルのデプロイを参照してください。自動一時停止がアクティブであることを確認してください。

注釈

サービスが中断されている場合、推論リクエストがあると自動的に再開されます。ただし、指定された時間内にサービスが再開に失敗すると、クエリが失敗する可能性があります。コンピューティングプールに利用可能なノードがない場合は、クエリの再開に失敗することがあります。このリスクを軽減するには、明示的にサービスを再開して、利用可能になるまで待つことができます。

XSMALL または SMALL ウェアハウスを使用して、推論リクエストを SPCS コンピューティングプールにルーティングします。ウェアハウスはノードごとに複数のスレッドを実行し、リクエストごとに多数の推論リクエストを送信できます。その結果、 SPCS 内で利用されるサービスは簡単に中止することができます。したがって、モデルが SPCS にデプロイされたときの XSMALL または SMALL ウェアハウスを利用することが推奨されます。

Pythonからの推論

Snowflake Python API を使用して推論リクエストを行う場合には、snowflake-ml-pythonパッケージが必要です。

モデルレジストリに接続

モデルレジストリから、推論リクエストに使用しているモデルを取得します。次のコードを使用して、モデルを取得します。

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

バッチ推論ジョブを実行

モデルバージョンオブジェクトのrunメソッドを使用して、バッチ推論ジョブを実行します。runメソッドを使用すると、次を実行できます。

  • ウェアハウスまたは SPCS コンピューティングプールで推論ジョブを実行します。

  • 推論データをSnowparkまたはpandasデータフレームに提供します。

run メソッドは、指定したデータフレームの型と一致するデータフレームを返します。たとえば、入力としてpandasデータフレームを指定すると、出力としてpandasデータフレームを取得します。

注釈

Snowpark DataFrames が遅延評価を受けます。実行は DataFrameのcollect、show、またはto_pandas メソッドでのみ行われます。

以下の例では、ウェアハウスでバッチ推論ジョブを実行します。

# Run inference on a warehouse
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict")
remote_prediction.show()
Copy

ウェアハウスの代わりにSPCSで推論を実行するには、``run``呼び出しに``service_name``引数を追加します。

# Run inference on SPCS
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict", service_name="example_spcs_service")
remote_prediction.show()
Copy

モデルから呼び出すことができるメソッドを見るには、mv.show_functionsを実行します。このメソッドの返り値は、 ModelFunctionInfo オブジェクトのリストです。これらの各オブジェクトには以下の属性が含まれます。

  • name:Pythonまたは SQL から呼び出せる関数名。

  • target_method:元のログモデルのPythonメソッド名。

# Get signature of the inference function in Python
# mv: snowflake.ml.model.ModelVersion
mv.show_functions()
Copy

推論中のパラメーターの引き渡し

モデルのシグネチャに、`ParamSpec <https://docs.snowflake.com/developer-guide/snowpark-ml/reference/latest/api/model/snowflake.ml.model.model_signature.ParamSpec>`_オブジェクトで定義されたパラメーターが含まれている場合、推論時に``params``引数を使ってパラメーター値を渡すことができます。ディクショナリに含まれていないパラメーターは、署名のデフォルト値を使用します。``params``引数は、ウェアハウスで推論を実行する場合でも、SPCSで実行する場合でも同じように機能します。

# Pass parameters to override default values
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(
    input_features,
    function_name="predict",
    params={"temperature": 0.9, "max_tokens": 512}
)
Copy

SQL からの推論

次のコマンドを使用して、モデルバージョンの利用可能な関数と署名を理解します。

SHOW FUNCTION IN MODEL mymodel VERSION myversion;
Copy

ウェアハウスでモデルを実行するには

モデルのメソッドを呼び出すには、 MODEL(model_name)!method_name(...) 構文を使います。モデルで利用可能なメソッドは、Pythonのモデルクラスによって決定されます。例えば、多くのタイプのモデルでは、推論にpredictというメソッドを使用します。

デフォルトモデルのメソッドを呼び出すには、以下の構文を使います。括弧内にメソッドの引数を含め、 FROM 句で推論データを含むテーブルを指定します。

SELECT MODEL(<model_name>)!<method_name>(...) FROM <table_name>;
Copy

モデルの特定のバージョンからメソッドを呼び出すには、モデルの特定のバージョンへのエイリアスを作成し、エイリアスを介してメソッドを呼び出します。

モデルの特定のバージョンからメソッドを呼び出すには、以下の構文を使います。

SELECT MODEL(<model_name>,<version_or_alias_name>)!<method_name>(...) FROM <table_name>;
Copy

次の例では、 LAST のエイリアスを使用して、モデルの最新バージョンを呼び出しています。

SELECT MODEL(my_model,LAST)!predict(...) FROM my_table;
Copy

SQLでのパラメーターの引き渡し

モデルのシグネチャに、:doc:`ParamSpec </developer-guide/snowflake-ml/model-registry/model-signature>`で定義されたパラメーターが含まれている場合、入力列の後に追加の引数としてパラメーター値を渡すことができます。パラメーターは、位置または名前で指定できます。

位置引数を使用する場合、右側のパラメーターを省略して、署名のデフォルトを使用できます。

-- Pass all parameters positionally (temperature, then max_tokens)
SELECT MODEL(my_model, v1)!predict(input_text, 0.9, 512) FROM my_table;

-- Omit max_tokens from the right; its default value from the signature is used
SELECT MODEL(my_model, v1)!predict(input_text, 0.9) FROM my_table;
Copy

名前付き引数を使用する場合、すべての引数(入力列を含む)を名前で指定する必要があります。これにより、位置に関係なく特定のパラメーターのみを渡すことができます。

SELECT MODEL(my_model, v1)!predict(
    input_text => input_text,
    max_tokens => 512
) FROM my_table;
Copy

注釈

すべての引数は名前で指定するか、すべて位置で指定する必要があります。同じ呼び出しで、位置引数と名前付き引数を混在させることはできません。

SPCS でモデルを実行するには

ウェアハウスで実行するのとは異なり、``service_name!method_name(...)``を呼び出すことで、サービスから関数を呼び出すことができます。

SELECT <mservice_name>!<method_name>(...) FROM <table_name>;
Copy

パラメーターは、ウェアハウス関数と同じ方法で、すべて位置またはすべて名前で渡されます。

-- Positional
SELECT my_service!predict(input_text, 0.9, 512) FROM my_table;

-- Named arguments (all arguments must be named)
SELECT my_service!predict(input_text => input_text, max_tokens => 512) FROM my_table;
Copy

動的テーブルによる連続モデル推論

Snowflakeの動的テーブルは、ストリームデータの上に継続的な変換レイヤーを確立します。機械学習モデルの予測を入力データに適用する動的テーブルを定義することで、手動でオーケストレーションを行うことなく、データに対して自動化され、継続的に動作するモデル推論パイプラインを維持することができます。

たとえば、テーブルに入力するログインイベントのストリーム(LOGINS_RAW)を考えます。USER_ID 、 LOCATION 、およびタイムスタンプなどの列が含まれます。このテーブルは、その後、新しく到着したイベントのログインリスクに関するモデルの予測で更新されます。重要なことに、新しい行だけがモデルの予測で処理されます。

SQL

動的テーブルは、Snowflakeユーザーが受信データに対してインクリメンタル推論を実行する堅牢な機能を提供します。SQL を使用して、モデルをリファレンスし LOGINS_RAW の新しい入力行にそれを適用する動的テーブルを定義します。

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
SELECT
    login_id,
    user_id,
    location,
    event_time,
    MODEL(ml.registry.mymodel)!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw;
Copy

Snowpark Python

Snowpark Python API では、プログラムでモデルレジストリにアクセスし、 DataFrames 上で直接推論を実行することができます。このアプローチは、特にコード駆動型の環境では、より柔軟で保守性の高いものとなります。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initialize="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

上記のコードサンプルは、 MYMODEL の新しいデータに対して LOGINS_RAW を使った推論を20分ごとに自動的に実行します。

不変と揮発性

このインクリメンタリティは必須であり、動的テーブルの定義内で呼び出されるすべての関数を IMMUTABLE として指定する必要があります。標準モデル内の関数は通常 IMMUTABLE 、カスタムモデルのデフォルトは VOLATILE です。基礎となるモデルが不変であることがわかっている場合は、モデルがログに記録されるときに、対応するモデル関数が明示的に不変としてマークされていることを確認することが重要です。