Trabalhos de inferência em lote¶
Nota
Recurso em versão preliminar — Público
Suportado na versão preliminar pública desde as versões 1.26.0 do snowflake-ml-python.
Use a inferência em lote do Snowflake para permitir inferência de modelo eficiente e em grande escala em conjuntos de dados estáticos ou atualizados periodicamente. A API Batch Inference usa o Snowpark Container Services (SPCS) para fornecer uma camada de computação distribuída otimizada para altíssimo rendimento e eficiência de custos.
Quando usar a inferência em lote¶
Use o método run_batch em cargas de trabalho para:
Processar arquivos de imagens, áudio ou vídeo, ou usar modelos multimodais com dados não estruturados.
Executar a inferência sobre milhões ou bilhões de linhas.
Executar a inferência como um estágio discreto e assíncrono em um pipeline.
Integrar a inferência como uma etapa dentro de um Airflow DAG ou uma tarefa do Snowflake.
Limitações¶
Para os casos de uso multimodal, a criptografia só é suportada no servidor
Modelos particionados não são compatíveis
Introdução¶
Conectar-se ao Model Registry¶
Conecte-se ao Snowflake Model Registry e recupere a referência do modelo como:
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
Executar trabalhos em lote¶
Esta API usa o Snowpark Container Services (SPCS) para iniciar a carga de trabalho de inferência. Depois de executar a inferência, a computação é automaticamente interrompida para impedir que haja cobranças adicionais. De modo geral, a API é similar a isto:
from snowflake.ml.model.batch import OutputSpec
# how to run a batch job
job = mv.run_batch(
compute_pool = "my_compute_pool",
X = session.table("my_table"),
output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
job.wait() # Optional: Blocking until the job finishes
Gerenciamento de trabalhos¶
Você pode obter uma lista de trabalhos, cancelar um trabalho, encontrar o identificador de um trabalho ou excluir um trabalho usando os métodos abaixo:
from snowflake.ml.jobs import list_jobs, delete_job, get_job
# view logs to troubleshoot
job.get_logs()
# cancel a job
job.cancel()
# list to see all jobs
list_jobs().show()
# get the handle of a job
job = get_job("my_db.my_schema.job_name")
# delete a job that you no longer wish to run
delete_job(job)
Nota
A função result nas APIs ML Jobs não é compatível com trabalhos de inferência em lote.
Especificação de dados de inferência¶
Você pode usar dados estruturados ou não estruturados para inferência em lote. Para usar dados estruturados em seu fluxo de trabalho, você pode fornecer uma consulta SQL ou um dataframe para o método run_batch.
Para dados não estruturados, você pode referenciar seus arquivos de um estágio do Snowflake. Para referenciar seus arquivos, crie um dataframe com os caminhos de arquivo.
Você fornece seu dataframe para o método run_batch. O run_batch fornece o conteúdo dos arquivos para o modelo.
Entrada estruturada¶
Os exemplos a seguir mostram a gama de possibilidades de entrada:
# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),
# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
.parquet("@DB.SCHEMA.STAGE/some/path")
.select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Entrada não estruturada (multimodal)¶
Para dados não estruturados, o método run_batch pode ler os arquivos dos caminhos de estágio totalmente qualificados fornecidos no dataframe de entrada. O exemplo a seguir mostra como especificar dados de entrada não estruturados:
# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
["@DB.SCHEMA.STAGE/dataset/files/file1"],
["@DB.SCHEMA.STAGE/dataset/files/file2"],
["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Para listar automaticamente todos os arquivos em um estágio como dataframe, use um código como o seguinte:
from snowflake.ml.utils.stage_file import list_stage_files
# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")
# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")
# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Expressão do tipo de dados¶
O Run_batch converte automaticamente seus arquivos para os formatos compatíveis com o modelo.
Seu modelo pode aceitar dados em um dos seguintes formatos:
RAW_BYTES
BASE64
Por exemplo, se você tiver imagens armazenadas no formato PNG em seu estágio e seu modelo aceita RAW_BYTES, você pode usar o argumento input_spec para especificar como o Snowflake converte seus dados.
O código de exemplo a seguir converte arquivos no seu estágio em RAW_BYTES:
mv.run_batch(
X,
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file from the stage path to bytes
column_handling={
"path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
),
...
)
O argumento column_handling informa ao framework que a coluna do caminho de X contém um caminho de estágio completo e chama o modelo com bytes brutos desse arquivo.
Saída (output_spec)¶
Especifique um diretório de estágio para armazenar a saída do arquivo, conforme mostrado aqui:
mv.run_batch(
...
output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
O Snowflake atualmente oferece suporte a modelos que geram textos e os armazenam como arquivos parquet. Você pode converter os arquivos parquet em um dataframe do Snowpark desta forma:
session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Parâmetros de passagem¶
Se a assinatura do modelo incluir parâmetros definidos com ParamSpec, é possível passar valores de parâmetro no momento da inferência usando o argumento params em InputSpec. Qualquer parâmetro não incluído no dicionário usa o valor padrão da assinatura.
from snowflake.ml.model.batch import InputSpec, OutputSpec
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
input_spec=InputSpec(
params={"temperature": 0.9, "max_tokens": 512}
),
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Especificação do trabalho¶
Para definir as configurações de nível de trabalho para sua carga de trabalho de inferência em lote (como o número de trabalhadores, alocação de recursos e parâmetros de execução, passe uma instância JobSpec como o argumento job_spec do método run_batch. Um exemplo é mostrado abaixo:
from snowflake.ml.model.batch import JobSpec, OutputSpec
job_spec = JobSpec(
job_name="my_inference_job",
cpu_requests="2",
memory_requests="8GiB",
max_batch_rows=2048,
replicas=2,
)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
job_spec=job_spec,
)
Práticas recomendadas¶
Como usar um arquivo sentinela¶
Um trabalho pode falhar no meio por vários motivos. Portanto, o diretório de saída pode passar a ter dados parciais. Para marcar a conclusão do trabalho, run_batch grava um arquivo de conclusão _SUCCESS no diretório de saída.
Para evitar uma saída parcial ou incorreta:
Leia os dados de saída somente depois que o arquivo sentinela for encontrado.
Forneça um diretório vazio para começar.
Execute run_batch com o modo = SaveMode.ERROR.
Exemplos¶
Uso de um modelo personalizado¶
from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core
# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
inputs=[
core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
],
outputs=[
core.FeatureGroupSpec(
name="outputs",
specs=[
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
core.FeatureGroupSpec(
name="chunks",
specs=[
core.FeatureSpec(
name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
),
core.FeatureSpec(name="text", dtype=core.DataType.STRING),
],
shape=(-1,),
),
],
),
],
)
# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
def __init__(self, context: custom_model.ModelContext) -> None:
super().__init__(context)
self.model = self.context.model_ref("my_model")
@custom_model.inference_api
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
import base64
audio_b64_list = df["audio"].tolist()
audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
return pd.DataFrame({"outputs": temp_res})
# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
custom_model.ModelContext(
models={
"my_model": pipeline(
task="automatic-speech-recognition", model="openai/whisper-small"
)
}
)
)
# log the model
mv = reg.log_model(
transcriber,
model_name="custom_transcriber",
version_name="v1",
signatures={"predict": signature},
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)
job = mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
column_handling={
"audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
}
)
)
Uso do modelo Hugging Face¶
from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")
# log the model
mv = reg.log_model(
classifier,
model_name="image_classifier",
version_name="v1",
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
pip_requirements=[
"pillow" # dependency for image classification
],
)
# input dataframe
data = [
["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
column_handling={
"IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
}
)
)
Uso do modelo Hugging Face com vLLM¶
Tarefa: geração de texto¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")
mv = reg.log_model(
model,
model_name="qwenw_5",
version_name="v1",
options={"cuda_version": "12.4"},
target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "How many breeds of cats are there?"},
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Tarefa: texto de imagem para texto¶
import json
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec
# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")
mv = reg.log_model(
model,
model_name="qwen2_vl_2b",
version_name="v1",
options={"cuda_version": "12.4"},
targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)
# constructing OpenAi chat/completions API compatible messages
messages = [[
{"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
{
"role": "user",
"content": [
{"type": "text", "text": "What breed of cat is this?"},
{
"type": "image_url",
"image_url": {
# run_batch will downlaod and convert the file to the format that vLLM can handle
"url": f"@db.schema.stage/path/cat.jpeg",
}
}
# you can also pass video and audio like below
# {
# "type": "video_url",
# "video_url": {
# "url": "@db.schema.stage/path/video.avi",
# }
# }
# {
# "type": "input_audio",
# "input_audio": {
# "data": "@db.schema.stage/path/audio.mp3",
# "format": "mp3",
# }
# }
]
}
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)
mv.run_batch(
X=input_df,
compute_pool="my_compute_pool",
output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
inference_engine_options={
# set vLLM as the inference backend
"engine": InferenceEngine.VLLM,
},
)
Notebooks de amostra¶
Para obter exemplos executáveis completos, consulte os notebooks de amostra de inferência em lote no GitHub.