バッチ推論ジョブ

注釈

プレビュー機能 --- パブリック

snowflake-ml-pythonバージョン1.26.0以降、パブリックプレビューでサポートされています。

Snowflakeバッチ推論を使用して、静的または定期的に更新されるデータセットに対して、効率的で大規模なモデル推論を可能にします。バッチ推論 API はSnowpark Container Services(SPCS)を使用して、大規模なスループットとコスト効率に最適化された分散コンピューティングレイヤーを提供します。

バッチ推論を使用する場合

ワークロードに run_batch メソッドを使用すると、次のことが可能になります。

  • 画像、音声、またはビデオファイルの処理、または非構造化データのマルチモーダルモデルの使用

  • 数百万または数十億の行に対して推論を実行します。

  • パイプラインの個別の非同期ステージとして推論を実行します。

  • Airflow DAG 内またはSnowflakeタスクのステップとして推論を統合する。

制限事項

  • マルチモーダルのユースケースでは、暗号化はサーバー側でのみサポートされます

  • パーティションモデルはサポートされていません

始めましょう

モデルレジストリに接続

Snowflakeモデルレジストリに接続し、次のようにモデル参照を取得します。

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

バッチジョブを実行する

この API はSnowpark Container Services(SPCS)ジョブを使用して、推論ワークロードを起動します。推論を実行すると、コンピューティングは自動的にウィンドウダウンし、追加料金は発生しません。高レベルでは、この API は次のようになります。

from snowflake.ml.model.batch import OutputSpec

# how to run a batch job
job = mv.run_batch(
    compute_pool = "my_compute_pool",
    X = session.table("my_table"),
    output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)

job.wait() # Optional: Blocking until the job finishes
Copy

ジョブ管理

以下のメソッドを使用して、ジョブのリストの取得、ジョブのキャンセル、ジョブのハンドルの取得、またはジョブの削除を行うことができます。

from snowflake.ml.jobs import list_jobs, delete_job, get_job

# view logs to troubleshoot
job.get_logs()

# cancel a job
job.cancel()

# list to see all jobs
list_jobs().show()

# get the handle of a job
job = get_job("my_db.my_schema.job_name")

# delete a job that you no longer wish to run
delete_job(job)
Copy

注釈

ML ジョブ APIs の``result`` 関数はバッチ推論ジョブはサポートされていません。

推論データを指定する

バッチ推論には構造化データまたは非構造化データを使用できます。ワークフローに構造化データを使用するには、run_batchメソッドに SQL クエリまたはデータフレームを提供します。

非構造化データの場合は、Snowflakeステージからファイルを参照できます。ファイルを参照するには、ファイルパスでデータフレームを作成します。

データフレームをrun_batchメソッドに提供します。run_batchは、モデルにファイルのコンテンツを提供します。

構造化入力

以下は、入力の可能性の範囲を示す例です。

# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),

# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
    .parquet("@DB.SCHEMA.STAGE/some/path")
    .select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Copy

非構造化入力(マルチモーダル)

非構造化データの場合、 run_batch メソッドは、入力データフレームで提供された完全修飾ステージパスからファイルを読み取ることができます。次の例は、構造化されていない入力データを指定する方法を示しています。

# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
    ["@DB.SCHEMA.STAGE/dataset/files/file1"],
    ["@DB.SCHEMA.STAGE/dataset/files/file2"],
    ["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Copy

ステージ内のすべてのファイルをデータフレームとして自動的に一覧表示するには、次のようなコードを使用します。

from snowflake.ml.utils.stage_file import list_stage_files

# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")

# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")

# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Copy

データの型の表現

Run_batchは、ファイルをモデル互換の形式に自動的に変換します。

モデルは以下のいずれかの形式のデータを受け入れることができます。

  • RAW_BYTES

  • BASE64

たとえば、ステージに PNG 形式で保存された画像があり、モデルが RAW_BYTES を受け入れる場合、 input_spec 引数で、Snowflakeがデータを変換する方法を指定できます。

次のコード例は、ステージ内のファイルを RAW_BYTES に変換します。

mv.run_batch(
    X,
    input_spec=InputSpec(
 # we need to provide column_handling in the InputSpec to perform the necessary conversion
 # FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
 # RAW_BYTES: download and convert the file from the stage path to bytes
        column_handling={
            "path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    ),
    ...
)
Copy

column_handling 引数は、Xのパス列に完全なステージパスが含まれることをフレームワークに伝え、そのファイルの生バイトでモデルを呼び出します。

出力(output_spec

ここに示すように、ファイル出力を格納するステージディレクトリを指定します。

mv.run_batch(
    ...
    output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Copy

Snowflakeは現在、テキストを出力しParquetファイルとして保存するモデルをサポートしています。次のようにして、ParquetファイルをSnowparkデータフレームに変換できます。

session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Copy

パラメーターの引き渡し

モデルのシグネチャに、 ParamSpec で定義されたパラメーターが含まれている場合、 InputSpecparams 引数を使って推論時にパラメーター値を渡すことができます。ディクショナリに含まれていないパラメーターは、署名のデフォルト値を使用します。

from snowflake.ml.model.batch import InputSpec, OutputSpec

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    input_spec=InputSpec(
        params={"temperature": 0.9, "max_tokens": 512}
    ),
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Copy

ジョブの仕様

バッチ推論ワークロードのジョブレベルの設定(ワーカー数、リソース割り当て、実行パラメーターなど)を構成するには、 run_batch メソッドの job_spec 引数として JobSpec インスタンスを渡します。以下に例を示します。

from snowflake.ml.model.batch import JobSpec, OutputSpec

job_spec = JobSpec(
    job_name="my_inference_job",
    cpu_requests="2",
    memory_requests="8GiB",
    max_batch_rows=2048,
    replicas=2,
)

job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    job_spec=job_spec,
)
Copy

ベストプラクティス

センチネルファイルの使用

様々な理由で、ジョブが途中で失敗することもあります。したがって、出力ディレクトリは部分的なデータになる可能性があります。ジョブの完了をマークするために、run_batchは出力ディレクトリ内に完了ファイル _SUCCESS を書き込みます。

部分的または誤った出力を避けるには:

  • センチネルファイルが見つかった後にのみ、出力データを読み取ります。

  • 始めるには、空のディレクトリを指定します。

  • モード = SaveMode.ERROR でrun_batchを実行します。

カスタムモデルの使用

from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core

# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
    inputs=[
        core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
    ],
    outputs=[
        core.FeatureGroupSpec(
            name="outputs",
            specs=[
                core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                core.FeatureGroupSpec(
                    name="chunks",
                    specs=[
                        core.FeatureSpec(
                            name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
                        ),
                        core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                    ],
                    shape=(-1,),
                ),
            ],
        ),
    ],
)

# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
    def __init__(self, context: custom_model.ModelContext) -> None:
        super().__init__(context)
        self.model = self.context.model_ref("my_model")

    @custom_model.inference_api
    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        import base64
        audio_b64_list = df["audio"].tolist()
        audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
        temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
        return pd.DataFrame({"outputs": temp_res})

# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
    custom_model.ModelContext(
        models={
            "my_model": pipeline(
                task="automatic-speech-recognition", model="openai/whisper-small"
            )
        }
    )
)

# log the model
mv = reg.log_model(
    transcriber,
    model_name="custom_transcriber",
    version_name="v1",
    signatures={"predict": signature},
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)


job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
        column_handling={
            "audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
        }
    )
)
Copy

Hugging Faceモデルの使用

from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")

# log the model
mv = reg.log_model(
    classifier,
    model_name="image_classifier",
    version_name="v1",
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
    pip_requirements=[
        "pillow" # dependency for image classification
    ],
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
        column_handling={
            "IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    )
)
Copy

vLLM でのHugging Faceモデルの使用

タスク:テキスト生成

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")

mv = reg.log_model(
    model,
    model_name="qwenw_5",
    version_name="v1",
    options={"cuda_version": "12.4"},
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "How many breeds of cats are there?"},
        ]
    }
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

タスク:画像テキストをテキストへ

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")

mv = reg.log_model(
    model,
    model_name="qwen2_vl_2b",
    version_name="v1",
    options={"cuda_version": "12.4"},
    targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "What breed of cat is this?"},
            {
                "type": "image_url",
                "image_url": {
                    # run_batch will downlaod and convert the file to the format that vLLM can handle
                    "url": f"@db.schema.stage/path/cat.jpeg",
                }
            }
     # you can also pass video and audio like below
            # {
            #     "type": "video_url",
            #     "video_url": {
            #         "url": "@db.schema.stage/path/video.avi",
            #     }
            # }
            # {
            #     "type": "input_audio",
            #     "input_audio": {
            #         "data": "@db.schema.stage/path/audio.mp3",
            #         "format": "mp3",
            #     }
            # }
        ]
    }
]]

schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

サンプルノートブック

エンドツーエンドの実行可能な例については、GitHubの`バッチ推論サンプルノートブック<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference>`_をご参照ください。