Batch Inference Jobs¶

Note

Preview Feature — Public

Supported in public preview since snowflake-ml-python versions 1.26.0.

Use Snowflake Batch Inference to enable efficient, large-scale model inference on static or periodically updated datasets. The Batch Inference API uses Snowpark Container Services (SPCS) to provide a distributed compute layer optimized for massive throughput and cost-efficiency.

When to Use Batch Inference¶

Use run_batch API for workloads to:

  • Process images, audio, or video files or using multimodal models with unstructured data

  • Execute inference over millions or billions of rows.

  • Run inference as a discrete, asynchronous stage in a pipeline.

  • Integrate inference as a step within an Airflow DAG or Snowflake Task.

Limitations¶

  • For the multi-modal use cases, encryption is only supported on the server side

  • Partitioned models aren’t supported

Get Started¶

Connect to Model Registry¶

Connect to the Snowflake Model Registry and retrieve the model reference as:

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

Execute Batch Job¶

This API uses Snowpark Container Services (SPCS) Job to launch the inference workload. After running inference, the compute automatically winds down to prevent you from incurring additional charges. On a high level API looks like the following:

from snowflake.ml.model import JobSpec, OutputSpec

# how to run a batch job
job = mv.run_batch(
    compute_pool = "my_compute_pool",
    X = session.table("my_table"),
    output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)

job.wait() # Optional: Blocking until the job finishes
Copy

Job Management¶

from snowflake.ml.jobs import list_jobs, delete_job, get_job

# view logs to troubleshoot
job.get_logs()

# cancel a job
job.cancel()

# list to see all jobs
list_jobs().show()

# get the handle of a job
job = get_job ("my_db.my_schema.job_name")

# delete a job that you no longer wish to run
delete_job(job)
Copy

Note

result() function in the ML Job APIs is not supported for Batch Inference Jobs

Specify inference data¶

You can use structured data or unstructured data for batch inference. To use structured data for your workflow, you can either provide a SQL query or a dataframe to the run_batch method.

For unstructured data, you can reference your files from a Snowflake stage. To reference your files, create a dataframe with the file paths.

You provide your dataframe to the run_batch method. run_batch provides the content of the files to the model.

Structured input¶

The following are examples illustrating the range of input possibilities:

# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),

# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
    .parquet("@DB.SCHEMA.STAGE/some/path")
    .select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Copy

Unstructured input (multi-modal)¶

For unstructured data, the run_batch() method can read the files from the fully qualified stage paths provided in the input dataframe. The following example shows you how to specify unstructured input data.

# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
    ["@DB.SCHEMA.STAGE/dataset/files/file1"],
    ["@DB.SCHEMA.STAGE/dataset/files/file2"],
    ["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Copy

To automatically list all files in a stage as dataframe, you can use the following code.

from snowflake.ml.utils.stage_file import list_stage_files

# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")

# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")

# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Copy

Expressing type of data¶

Run_batch automatically converts your files to the model compatible formats.

Your model can accept data in one of the following formats:

  • RAW_BYTES

  • BASE64

For example, if you have images stored in PNG format in your stage and your model accepts RAW_BYTES, you can use the input_spec argument to specify how Snowflake converts your data.

You can use the following example code to convert the files in your stage to RAW_BYTES.

mv.run_batch(
    X,
    input_spec=InputSpec(
 # we need to provide column_handling in the InputSpec to perform the necessary conversion
 # FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
 # RAW_BYTES: download and convert the file from the stage path to bytes
        column_handling={
            "path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    ),
    ...
)
Copy

Notice, here column_handling tells the framework that the path column of X contains a full stage path, and calls the model with raw bytes from that file.

Output (output_spec)¶

The API requires a stage directory to store the file output.

mv.run_batch(
    ...
    output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Copy

Snowflake currently supports models that output text and stores them as parquet files. You can convert the parquet files to a Snowpark data frame:

session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Copy

Best Practices¶

Using a sentinel file¶

A job can fail midway for various reasons. The output directory can therefore end up having partial data. To mark completion of the job, run_batch writes a completion file _SUCCESS in the output directory.

To avoid having partial or incorrect output:

  • Read output data only after the sentinel file is found.

  • Provide an empty directory to begin with.

  • Run run_batch with mode = SaveMode.ERROR.

Examples¶

Using Custom Model¶

from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model import target_platform
from snowflake.ml.model.model_signature import core

# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
    inputs=[
        core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
    ],
    outputs=[
        core.FeatureGroupSpec(
            name="outputs",
            specs=[
                core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                core.FeatureGroupSpec(
                    name="chunks",
                    specs=[
                        core.FeatureSpec(
                            name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
                        ),
                        core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                    ],
                    shape=(-1,),
                ),
            ],
        ),
    ],
)

# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
    def __init__(self, context: custom_model.ModelContext) -> None:
        super().__init__(context)
        self.model = self.context.model_ref("my_model")

    @custom_model.inference_api
    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        import base64
        audio_b64_list = df["audio"].tolist()
        audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
        temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
        return pd.DataFrame({"outputs": temp_res})

# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
    custom_model.ModelContext(
        models={
            "my_model": pipeline(
                task="automatic-speech-recognition", model="openai/whisper-small"
            )
        }
    )
)

# log the model
mv = reg.log_model(
    transcriber,
    model_name="custom_transcriber",
    version_name="v1",
    signatures={"predict": signature},
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)


job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
        column_handling={
            "audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
        }
    )
)
Copy

Using Hugging Face Model¶

from transformers import pipeline
from snowflake.ml.model import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model import target_platform

# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")

# log the model
mv = reg.log_model(
    classifier,
    model_name="image_classifier",
    version_name="v1",
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
    pip_requirements=[
        "pillow" # dependency for image classification
    ],
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
        column_handling={
            "IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    )
)
Copy

Using Hugging Face Model with vLLM¶

Task: Text-Generation¶

import json

from snowflake.ml.model import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model import target_platform

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")

mv = reg.log_model(
    model,
    model_name="qwenw_5",
    version_name="v1",
    options={"cuda_version": "12.4"},
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "How many breeds of cats are there?"},
        ]
    }
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

Task: Image-Text-to-Text¶

import json

from snowflake.ml.model import InputSpec, OutputSpec
from snowflake.ml.model import target_platform

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")

mv = reg.log_model(
    model,
    model_name="qwen2_vl_2b",
    version_name="v1",
    options={"cuda_version": "12.4"},
    targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "What breed of cat is this?"},
            {
                "type": "image_url",
                "image_url": {
                    # run_batch will downlaod and convert the file to the format that vLLM can handle
                    "url": f"@db.schema.stage/path/cat.jpeg",
                }
            }
     # you can also pass video and audio like below
            # {
            #     "type": "video_url",
            #     "video_url": {
            #         "url": "@db.schema.stage/path/video.avi",
            #     }
            # }
            # {
            #     "type": "input_audio",
            #     "input_audio": {
            #         "data": "@db.schema.stage/path/audio.mp3",
            #         "format": "mp3",
            #     }
            # }
        ]
    }
]]

schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy