Batch-Inferenzjobs¶
Bemerkung
Vorschau-Feature – Öffentlich
Wird in der öffentlichen Vorschau seit snowflake-ml-python Version 1.26.0 unterstützt.
Verwenden Sie Snowflake Batch-Inferenz, um effiziente, umfangreiche Modell-Inferenzen für statische oder regelmäßig aktualisierte Datensets zu ermöglichen. Die Batch-Inferenz-API nutzt Snowpark Container Services (SPCS), um eine verteilte Verarbeitungsschicht bereitzustellen, die für einen massiven Durchsatz und Kosteneffizienz optimiert ist.
Wann Batch-Inferenz verwendet werden sollte¶
Verwenden Sie die Methode run_batch für folgende Workloads:
Verarbeiten von Bild-, Audio- oder Videodateien oder Verwenden von multimodalen Modellen mit unstrukturierten Daten
Ausführen von Inferenz über Millionen oder Milliarden von Zeilen
Ausführen von Inferenz als diskreter, asynchroner Stagingbereich in einer Pipeline
Integrieren von Inferenz als einen Schritt in einer Airflow DAG oder Snowflake-Aufgabe.
Einschränkungen¶
Bei den multimodalen Anwendungsfällen wird die Verschlüsselung nur auf der Serverseite unterstützt
Partitionierte Modelle werden nicht unterstützt
Erste Schritte¶
Verbinden mit Model Registry¶
Stellen Sie eine Verbindung zur Snowflake Model Registry her und rufen Sie die Modellreferenz ab als:
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
Batch-Job ausführen¶
Diese API nutzt den Snowpark Container Services (SPCS)-Job, um den Inferenz-Workload zu starten. Nach der Ausführung der Inferenz wird die Berechnung automatisch heruntergefahren, damit Ihnen keine zusätzlichen Kosten entstehen. Allgemein zusammengefasst sieht diese API wie folgt aus:
from snowflake.ml.model.batch import OutputSpec
# how to run a batch job
job = mv.run_batch(
compute_pool = "my_compute_pool",
X = session.table("my_table"),
output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
job.wait() # Optional: Blocking until the job finishes
Jobverwaltung¶
Mit den folgenden Methoden können Sie eine Liste von Jobs abrufen, einen Job abbrechen, das Handle eines Jobs abrufen oder einen Job löschen:
from snowflake.ml.jobs import list_jobs, delete_job, get_job
# view logs to troubleshoot
job.get_logs()
# cancel a job
job.cancel()
# list to see all jobs
list_jobs().show()
# get the handle of a job
job = get_job("my_db.my_schema.job_name")
# delete a job that you no longer wish to run
delete_job(job)
Bemerkung
Die result-Funktion in ML-Job-APIs wird für Batch-Inferenzjobs nicht unterstützt.
Inferenzdaten angeben¶
Sie können strukturierte Daten oder unstrukturierte Daten für die Batch-Inferenz verwenden. Um strukturierte Daten für Ihren Workflow zu verwenden, können Sie entweder eine SQL-Abfrage oder einen Datenframe für die „run_batch“-Methode bereitstellen.
Bei unstrukturierten Daten können Sie auf Ihre Dateien von einem Snowflake-Stagingbereich aus verweisen. Um auf Ihre Dateien zu verweisen, erstellen Sie einen Datenframe mit den Dateipfaden.
Sie stellen Ihren Datenframe der Methode „run_batch“ zur Verfügung. „run_batch“ stellt den Inhalt der Dateien für das Modell bereit.
Strukturierte Eingabe¶
Im Folgenden finden Sie Beispiele, die den Bereich der Eingabemöglichkeiten veranschaulichen:
# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),
# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
.parquet("@DB.SCHEMA.STAGE/some/path")
.select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Unstrukturierte Eingabe (multimodal)¶
Für unstrukturierte Daten kann die run_batch-Methode die Dateien aus den im Eingabe-Datenframe angegebenen vollqualifizierten Stagingbereichspfaden lesen. Das folgende Beispiel zeigt, wie Sie unstrukturierte Eingabedaten angeben:
# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
["@DB.SCHEMA.STAGE/dataset/files/file1"],
["@DB.SCHEMA.STAGE/dataset/files/file2"],
["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Um automatisch alle Dateien in einem Stagingbereich als Datenframes aufzulisten, verwenden Sie Code wie den folgenden:
from snowflake.ml.utils.stage_file import list_stage_files
# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")
# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")
# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Ausdrücken des Datentyps¶
Run_batch konvertiert Ihre Dateien automatisch in die mit dem Modell kompatiblen Formate.
Ihr Modell kann Daten in einem der folgenden Formate akzeptieren:
RAW_BYTES
BASE64
Beispiel: Wenn Sie Bilder im PNG-Format in Ihrem Stagingbereich gespeichert haben und Ihr Modell RAW_BYTES akzeptiert, können Sie das Argument input_spec verwenden, um anzugeben, wie Snowflake Ihre Daten konvertiert.
Der folgende Beispielcode konvertiert Dateien in Ihrem Stagingbereich in RAW_BYTES:
mv.run_batch(
X,
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file from the stage path to bytes
column_handling={
"path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
),
...
)
Das Argument column_handling teilt dem Framework mit, dass die Pfadspalte von X einen vollständigen Stagingbereichspfad enthält, und ruft das Modell mit Raw-Bytes aus dieser Datei auf.
Ausgabe (output_spec)¶
Geben Sie ein Stagingbereichsverzeichnis zum Speichern der Ausgabe der Datei an, wie hier gezeigt:
mv.run_batch(
...
output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Snowflake unterstützt derzeit Modelle, die Text ausgeben, und speichert diese als Parquet-Dateien. Sie können die Parquet-Dateien wie folgt in einen Snowpark-Datenframe konvertieren:
session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Übergeben von Parametern¶
Wenn die Signatur des Modells Parameter enthält, die mit ParamSpec definiert sind, können Sie Parameterwerte zur Inferenzzeit mithilfe des Arguments params in InputSpec übergeben. Jeder Parameter, der nicht im Dictionary enthalten ist, verwendet seinen Standardwert aus der Signatur.
from snowflake.ml.model.batch import InputSpec, OutputSpec
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
input_spec=InputSpec(
params={"temperature": 0.9, "max_tokens": 512}
),
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Job-Spezifikation¶
Um Einstellungen auf Jobebene für Ihren Batch-Inferenz-Workload zu konfigurieren (z. B. die Anzahl der Worker, die Ressourcenzuweisung und die Ausführungsparameter), übergeben Sie eine JobSpec-Instanz als job_spec-Argument der run_batch-Methode. Siehe nachfolgendes Beispiel:
from snowflake.ml.model.batch import JobSpec, OutputSpec
job_spec = JobSpec(
job_name="my_inference_job",
cpu_requests="2",
memory_requests="8GiB",
max_batch_rows=2048,
replicas=2,
)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
job_spec=job_spec,
)
Best Practices¶
Verwenden einer Sentinel-Datei¶
Ein Job kann aus verschiedenen Gründen mittendrin fehlschlagen. Das Ausgabeverzeichnis kann daher Teildaten enthalten. Um den Abschluss des Jobs zu markieren, schreibt „run_batch“ eine Sentinel- oder Abschlussdatei mit _SUCCESS in das Ausgabeverzeichnis.
So vermeiden Sie eine teilweise oder falsche Ausgabe:
Die Ausgabedaten werden erst gelesen, nachdem die Sentinel-Datei gefunden wurde.
Geben Sie zunächst ein leeres Verzeichnis an, um zu beginnen.
Führen Sie „run_batch“ mit Modus = SaveMode.ERROR aus.
Beispiele¶
Verwenden eines kundenspezifischen Modells¶
from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core
# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
inputs=[
core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
],
outputs=[
core.FeatureGroupSpec(
name="outputs",
specs=[
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
core.FeatureGroupSpec(
name="chunks",
specs=[
core.FeatureSpec(
name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
),
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
],
shape=(-1,),
),
],
),
],
)
# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = self.context.model_ref("my_model")
@custom_model.inference_api
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
import base64
audio_b64_list = df["audio"].tolist()
audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
return pd.DataFrame({"outputs": temp_res})
# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
custom_model.ModelContext(
models={
"my_model": pipeline(
task="automatic-speech-recognition", model="openai/whisper-small"
)
}
)
)
# log the model
mv = reg.log_model(
transcriber,
model_name="custom_transcriber",
version_name="v1",
signatures={"predict": signature},
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
column_handling={
"audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
}
)
)
Verwenden des Hugging Face-Modells¶
from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")
# log the model
mv = reg.log_model(
classifier,
model_name="image_classifier",
version_name="v1",
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
pip_requirements=[
"pillow" # dependency for image classification
],
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
column_handling={
"IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
)
)
Verwenden des Hugging Face-Modells mit vLLM¶
Aufgabe: Textgenerierung¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")
mv = reg.log_model(
model,
model_name="qwenw_5",
version_name="v1",
options={"cuda_version": "12.4"},
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "How many breeds of cats are there?"},
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Aufgabe: Bildtext in Text umwandeln¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")
mv = reg.log_model(
model,
model_name="qwen2_vl_2b",
version_name="v1",
options={"cuda_version": "12.4"},
targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "What breed of cat is this?"},
{
"type": "image_url",
"image_url": {
# run_batch will downlaod and convert the file to the format that vLLM can handle
"url": f"@db.schema.stage/path/cat.jpeg",
}
}
# you can also pass video and audio like below
# {
# "type": "video_url",
# "video_url": {
# "url": "@db.schema.stage/path/video.avi",
# }
# }
# {
# "type": "input_audio",
# "input_audio": {
# "data": "@db.schema.stage/path/audio.mp3",
# "format": "mp3",
# }
# }
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Beispiele für Notebooks¶
Beispiele für durchgängig ausführbare finden Sie unter`Beispiel-Notebooks für Batch-Inferenz<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference>`_ auf GitHub.