Tâches d’inférence par lots¶
Note
Fonction de prévisualisation — Publique
Prise en charge en prévisualisation publique à partir de la version 1.26.0 de snowflake-ml-python.
Utilisez l’inférence par lots Snowflake pour permettre une inférence efficace et à grande échelle de modèles sur des ensembles de données statiques ou mis à jour périodiquement. L’API d’inférence par lots utilise Snowpark Container Services (SPCS) pour fournir une couche de calcul distribuée optimisée pour un débit et une rentabilité considérables.
Quand utiliser l’inférence par lots¶
Utilisez la méthode run_batch pour les charges de travail pour :
Traiter des images, des fichiers audio ou vidéo ou utiliser des modèles multimodaux avec des données non structurées
Exécuter une inférence sur des millions ou des milliards de lignes.
Exécuter l’inférence en tant que zone de préparation discrète et asynchrone dans un pipeline
Intégrer l’inférence en tant qu’étape dans un DAG Airflow ou une tâche Snowflake.
Limitations¶
Pour les cas d’utilisation multimodaux, le chiffrement n’est pris en charge que côté serveur.
Les modèles partitionnés ne sont pas pris en charge
Prise en main¶
Se connecter au registre des modèles¶
Connectez-vous au registre des modèles Snowflake et récupérez la référence du modèle comme suit :
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
Exécuter une tâche par lots¶
Cette API utilise la tâche Snowpark Container Services (SPCS) pour lancer la charge de travail d’inférence. Après l’exécution de l’inférence, le calcul s’arrête automatiquement pour vous éviter d’encourir des frais supplémentaires. À un haut niveau, cette API ressemble à ce qui suit :
from snowflake.ml.model.batch import OutputSpec
# how to run a batch job
job = mv.run_batch(
compute_pool = "my_compute_pool",
X = session.table("my_table"),
output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
job.wait() # Optional: Blocking until the job finishes
Gestion des tâches¶
Vous pouvez obtenir une liste de tâches, annuler une tâche, obtenir l’identifiant (handle) d’une tâche ou supprimer une tâche à l’aide des méthodes ci-dessous :
from snowflake.ml.jobs import list_jobs, delete_job, get_job
# view logs to troubleshoot
job.get_logs()
# cancel a job
job.cancel()
# list to see all jobs
list_jobs().show()
# get the handle of a job
job = get_job("my_db.my_schema.job_name")
# delete a job that you no longer wish to run
delete_job(job)
Note
La fonction result dans les APIs de tâche ML n’est pas prise en charge pour les tâches d’inférence par lots.
Spécifier des données d’inférence¶
Vous pouvez utiliser des données structurées ou des données non structurées pour l’inférence par lots. Pour utiliser des données structurées dans votre workflow, vous pouvez fournir une requête SQL ou un dataframe à la méthode run_batch.
Pour les données non structurées, vous pouvez référencer vos fichiers à partir d’une zone de préparation Snowflake. Pour référencer vos fichiers, créez un dataframe avec les chemins d’accès aux fichiers.
Vous fournissez votre dataframe à la méthode run_batch et la méthode run_batch fournit le contenu des fichiers au modèle.
Entrée structurée¶
Les exemples suivants illustrent la plage des possibilités d’entrée :
# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),
# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
.parquet("@DB.SCHEMA.STAGE/some/path")
.select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Entrée non structurée (multimodale)¶
Pour les données non structurées, la méthode run_batch peut lire les fichiers à partir des chemins de zone de préparation complets fournis dans le dataframe d’entrée. L’exemple suivant vous montre comment spécifier des données d’entrée non structurées :
# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
["@DB.SCHEMA.STAGE/dataset/files/file1"],
["@DB.SCHEMA.STAGE/dataset/files/file2"],
["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Pour lister automatiquement tous les fichiers d’une zone de préparation en tant que dataframe, utilisez un code comme celui-ci :
from snowflake.ml.utils.stage_file import list_stage_files
# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")
# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")
# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Expression du type de données¶
Run_batch convertit automatiquement vos fichiers dans les formats compatibles avec les modèles.
Votre modèle peut accepter des données dans l’un des formats suivants :
RAW_BYTES
BASE64
Par exemple, si vous avez des images stockées au format PNG dans votre zone de préparation et si votre modèle accepte RAW_BYTES, vous pouvez utiliser l’argument input_spec pour spécifier comment Snowflake convertit vos données.
L’exemple de code suivant convertit les fichiers de votre zone de préparation en RAW_BYTES :
mv.run_batch(
X,
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file from the stage path to bytes
column_handling={
"path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
),
...
)
L’argument column_handling indique au framework que la colonne de chemin X contient un chemin complet de la zone de préparation et appelle le modèle avec des octets bruts de ce fichier.
Sortie (output_spec)¶
Spécifiez un répertoire de zone de préparation pour stocker la sortie du fichier, comme indiqué ici :
mv.run_batch(
...
output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Snowflake prend actuellement en charge les modèles qui produisent du texte et les stockent sous forme de fichiers Parquet. Vous pouvez convertir les fichiers Parquet en un cadre de données Snowpark comme suit :
session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Transmission de paramètres¶
Si la signature du modèle comprend des paramètres définis avec ParamSpec, vous pouvez transmettre les valeurs des paramètres au moment de l’inférence en utilisant l’argument params dans InputSpec. Tout paramètre non inclus dans le dictionnaire utilise sa valeur par défaut à partir de la signature.
from snowflake.ml.model.batch import InputSpec, OutputSpec
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
input_spec=InputSpec(
params={"temperature": 0.9, "max_tokens": 512}
),
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Spécification de la tâche¶
Pour configurer les paramètres de niveau tâche pour votre charge de travail d’inférence par lots (comme le nombre de travailleurs, l’allocation des ressources et les paramètres d’exécution, transmettez une instance JobSpec en tant qu’argument job_spec de la méthode run_batch. En voici un exemple :
from snowflake.ml.model.batch import JobSpec, OutputSpec
job_spec = JobSpec(
job_name="my_inference_job",
cpu_requests="2",
memory_requests="8GiB",
max_batch_rows=2048,
replicas=2,
)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
job_spec=job_spec,
)
Meilleures pratiques¶
Utilisation d’un fichier sentinelle¶
Une tâche peut échouer à mi-chemin pour diverses raisons. Le répertoire de sortie peut donc se retrouver avec des données partielles. Pour marquer l’achèvement de la tâche, la méthode run_batch écrit un fichier d’achèvement _SUCCESS dans le répertoire de sortie.
Pour éviter toute sortie partielle ou incorrecte :
Ne lire les données de sortie qu’une fois le fichier sentinelle trouvé.
Indiquez un répertoire vide pour commencer.
Exécutez la méthode run_batch avec le mode = SaveMode.ERROR.
Exemples¶
Utilisation d’un modèle personnalisé¶
from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core
# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
inputs=[
core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
],
outputs=[
core.FeatureGroupSpec(
name="outputs",
specs=[
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
core.FeatureGroupSpec(
name="chunks",
specs=[
core.FeatureSpec(
name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
),
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
],
shape=(-1,),
),
],
),
],
)
# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = self.context.model_ref("my_model")
@custom_model.inference_api
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
import base64
audio_b64_list = df["audio"].tolist()
audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
return pd.DataFrame({"outputs": temp_res})
# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
custom_model.ModelContext(
models={
"my_model": pipeline(
task="automatic-speech-recognition", model="openai/whisper-small"
)
}
)
)
# log the model
mv = reg.log_model(
transcriber,
model_name="custom_transcriber",
version_name="v1",
signatures={"predict": signature},
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
column_handling={
"audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
}
)
)
Utilisation du modèle Hugging Face¶
from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")
# log the model
mv = reg.log_model(
classifier,
model_name="image_classifier",
version_name="v1",
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
pip_requirements=[
"pillow" # dependency for image classification
],
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
column_handling={
"IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
)
)
Utilisation du modèle Hugging Face avec vLLM¶
Tâche : génération de texte¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")
mv = reg.log_model(
model,
model_name="qwenw_5",
version_name="v1",
options={"cuda_version": "12.4"},
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "How many breeds of cats are there?"},
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Tâche : texte d’image vers texte¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")
mv = reg.log_model(
model,
model_name="qwen2_vl_2b",
version_name="v1",
options={"cuda_version": "12.4"},
targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "What breed of cat is this?"},
{
"type": "image_url",
"image_url": {
# run_batch will downlaod and convert the file to the format that vLLM can handle
"url": f"@db.schema.stage/path/cat.jpeg",
}
}
# you can also pass video and audio like below
# {
# "type": "video_url",
# "video_url": {
# "url": "@db.schema.stage/path/video.avi",
# }
# }
# {
# "type": "input_audio",
# "input_audio": {
# "data": "@db.schema.stage/path/audio.mp3",
# "format": "mp3",
# }
# }
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Exemples de notebooks¶
Pour des exemples d’exécutions de bout en bout, consultez les ` exemples de notebooks d’inférence par lots <https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference>`_ sur GitHub.