バッチ推論ジョブ¶
注釈
プレビュー機能 --- パブリック
snowflake-ml-pythonバージョン1.26.0以降、パブリックプレビューでサポートされています。
Snowflakeバッチ推論を使用して、静的または定期的に更新されるデータセットに対して、効率的で大規模なモデル推論を可能にします。バッチ推論 API はSnowpark Container Services(SPCS)を使用して、大規模なスループットとコスト効率に最適化された分散コンピューティングレイヤーを提供します。
バッチ推論を使用する場合¶
ワークロードに run_batch メソッドを使用すると、次のことが可能になります。
画像、音声、またはビデオファイルの処理、または非構造化データのマルチモーダルモデルの使用
数百万または数十億の行に対して推論を実行します。
パイプラインの個別の非同期ステージとして推論を実行します。
Airflow DAG 内またはSnowflakeタスクのステップとして推論を統合する。
制限事項¶
マルチモーダルのユースケースでは、暗号化はサーバー側でのみサポートされます
パーティションモデルはサポートされていません
始めましょう¶
モデルレジストリに接続¶
Snowflakeモデルレジストリに接続し、次のようにモデル参照を取得します。
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
バッチジョブを実行する¶
この API はSnowpark Container Services(SPCS)ジョブを使用して、推論ワークロードを起動します。推論を実行すると、コンピューティングは自動的にウィンドウダウンし、追加料金は発生しません。高レベルでは、この API は次のようになります。
from snowflake.ml.model.batch import OutputSpec
# how to run a batch job
job = mv.run_batch(
compute_pool = "my_compute_pool",
X = session.table("my_table"),
output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
job.wait() # Optional: Blocking until the job finishes
ジョブ管理¶
以下のメソッドを使用して、ジョブのリストの取得、ジョブのキャンセル、ジョブのハンドルの取得、またはジョブの削除を行うことができます。
from snowflake.ml.jobs import list_jobs, delete_job, get_job
# view logs to troubleshoot
job.get_logs()
# cancel a job
job.cancel()
# list to see all jobs
list_jobs().show()
# get the handle of a job
job = get_job("my_db.my_schema.job_name")
# delete a job that you no longer wish to run
delete_job(job)
注釈
ML ジョブ APIs の``result`` 関数はバッチ推論ジョブはサポートされていません。
推論データを指定する¶
バッチ推論には構造化データまたは非構造化データを使用できます。ワークフローに構造化データを使用するには、run_batchメソッドに SQL クエリまたはデータフレームを提供します。
非構造化データの場合は、Snowflakeステージからファイルを参照できます。ファイルを参照するには、ファイルパスでデータフレームを作成します。
データフレームをrun_batchメソッドに提供します。run_batchは、モデルにファイルのコンテンツを提供します。
構造化入力¶
以下は、入力の可能性の範囲を示す例です。
# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),
# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
.parquet("@DB.SCHEMA.STAGE/some/path")
.select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
非構造化入力(マルチモーダル)¶
非構造化データの場合、 run_batch メソッドは、入力データフレームで提供された完全修飾ステージパスからファイルを読み取ることができます。次の例は、構造化されていない入力データを指定する方法を示しています。
# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
["@DB.SCHEMA.STAGE/dataset/files/file1"],
["@DB.SCHEMA.STAGE/dataset/files/file2"],
["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
ステージ内のすべてのファイルをデータフレームとして自動的に一覧表示するには、次のようなコードを使用します。
from snowflake.ml.utils.stage_file import list_stage_files
# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")
# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")
# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
データの型の表現¶
Run_batchは、ファイルをモデル互換の形式に自動的に変換します。
モデルは以下のいずれかの形式のデータを受け入れることができます。
RAW_BYTES
BASE64
たとえば、ステージに PNG 形式で保存された画像があり、モデルが RAW_BYTES を受け入れる場合、 input_spec 引数で、Snowflakeがデータを変換する方法を指定できます。
次のコード例は、ステージ内のファイルを RAW_BYTES に変換します。
mv.run_batch(
X,
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file from the stage path to bytes
column_handling={
"path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
),
...
)
column_handling 引数は、Xのパス列に完全なステージパスが含まれることをフレームワークに伝え、そのファイルの生バイトでモデルを呼び出します。
出力(output_spec)¶
ここに示すように、ファイル出力を格納するステージディレクトリを指定します。
mv.run_batch(
...
output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Snowflakeは現在、テキストを出力しParquetファイルとして保存するモデルをサポートしています。次のようにして、ParquetファイルをSnowparkデータフレームに変換できます。
session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
パラメーターの引き渡し¶
モデルのシグネチャに、 ParamSpec で定義されたパラメーターが含まれている場合、 InputSpec の params 引数を使って推論時にパラメーター値を渡すことができます。ディクショナリに含まれていないパラメーターは、署名のデフォルト値を使用します。
from snowflake.ml.model.batch import InputSpec, OutputSpec
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
input_spec=InputSpec(
params={"temperature": 0.9, "max_tokens": 512}
),
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
ジョブの仕様¶
バッチ推論ワークロードのジョブレベルの設定(ワーカー数、リソース割り当て、実行パラメーターなど)を構成するには、 run_batch メソッドの job_spec 引数として JobSpec インスタンスを渡します。以下に例を示します。
from snowflake.ml.model.batch import JobSpec, OutputSpec
job_spec = JobSpec(
job_name="my_inference_job",
cpu_requests="2",
memory_requests="8GiB",
max_batch_rows=2048,
replicas=2,
)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
job_spec=job_spec,
)
ベストプラクティス¶
センチネルファイルの使用¶
様々な理由で、ジョブが途中で失敗することもあります。したがって、出力ディレクトリは部分的なデータになる可能性があります。ジョブの完了をマークするために、run_batchは出力ディレクトリ内に完了ファイル _SUCCESS を書き込みます。
部分的または誤った出力を避けるには:
センチネルファイルが見つかった後にのみ、出力データを読み取ります。
始めるには、空のディレクトリを指定します。
モード = SaveMode.ERROR でrun_batchを実行します。
例¶
カスタムモデルの使用¶
from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core
# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
inputs=[
core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
],
outputs=[
core.FeatureGroupSpec(
name="outputs",
specs=[
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
core.FeatureGroupSpec(
name="chunks",
specs=[
core.FeatureSpec(
name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
),
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
],
shape=(-1,),
),
],
),
],
)
# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = self.context.model_ref("my_model")
@custom_model.inference_api
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
import base64
audio_b64_list = df["audio"].tolist()
audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
return pd.DataFrame({"outputs": temp_res})
# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
custom_model.ModelContext(
models={
"my_model": pipeline(
task="automatic-speech-recognition", model="openai/whisper-small"
)
}
)
)
# log the model
mv = reg.log_model(
transcriber,
model_name="custom_transcriber",
version_name="v1",
signatures={"predict": signature},
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
column_handling={
"audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
}
)
)
Hugging Faceモデルの使用¶
from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")
# log the model
mv = reg.log_model(
classifier,
model_name="image_classifier",
version_name="v1",
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
pip_requirements=[
"pillow" # dependency for image classification
],
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
column_handling={
"IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
)
)
vLLM でのHugging Faceモデルの使用¶
タスク:テキスト生成¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")
mv = reg.log_model(
model,
model_name="qwenw_5",
version_name="v1",
options={"cuda_version": "12.4"},
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "How many breeds of cats are there?"},
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
タスク:画像テキストをテキストへ¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")
mv = reg.log_model(
model,
model_name="qwen2_vl_2b",
version_name="v1",
options={"cuda_version": "12.4"},
targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "What breed of cat is this?"},
{
"type": "image_url",
"image_url": {
# run_batch will downlaod and convert the file to the format that vLLM can handle
"url": f"@db.schema.stage/path/cat.jpeg",
}
}
# you can also pass video and audio like below
# {
# "type": "video_url",
# "video_url": {
# "url": "@db.schema.stage/path/video.avi",
# }
# }
# {
# "type": "input_audio",
# "input_audio": {
# "data": "@db.schema.stage/path/audio.mp3",
# "format": "mp3",
# }
# }
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
サンプルノートブック¶
エンドツーエンドの実行可能な例については、GitHubの`バッチ推論サンプルノートブック<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference>`_をご参照ください。