일괄 추론 작업

참고

미리 보기 기능 — 공개

snowflake-ml-python 버전 1.26.0부터 공개 미리 보기로 지원됩니다.

Snowflake Batch Inference를 사용하여 정적 데이터 세트 또는 주기적으로 업데이트되는 데이터 세트에 대한 효율적인 대규모 모델 추론을 지원합니다. Batch Inference API는 Snowpark Container Services(SPCS)를 사용하여 대규모 처리량과 비용 효율성에 최적화된 분산 컴퓨팅 계층을 제공합니다.

일괄 추론을 사용해야 하는 경우

워크로드에 run_batch 메서드를 사용하여 다음을 수행합니다.

  • 이미지, 오디오, 비디오 파일 처리 또는 비정형 데이터가 포함된 멀티모달 모델 사용

  • 수백만 또는 수십억 개의 행에 대한 추론을 실행합니다.

  • 파이프라인에서 추론을 불연속 비동기 스테이지로 실행합니다.

  • 추론을 Airflow DAG 또는 Snowflake Task 내의 단계로 통합합니다.

제한 사항

  • 멀티모달 사용 사례의 경우 암호화는 서버 측에서만 지원됨

  • 분할된 모델은 지원되지 않음

시작하기

모델 레지스트리에 연결

Snowflake Model Registry에 연결하고 모델 참조를 다음과 같이 검색합니다.

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

일괄 작업 실행

이 API는 Snowpark Container Services(SPCS)를 사용하여 추론 워크로드를 시작합니다. 추론을 실행한 후에는 추가 요금이 발생하지 않도록 컴퓨팅이 자동으로 종료됩니다. 간략하게 설명하면 이 API는 다음과 같습니다.

from snowflake.ml.model.batch import OutputSpec

# how to run a batch job
job = mv.run_batch(
    compute_pool = "my_compute_pool",
    X = session.table("my_table"),
    output_spec = OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)

job.wait() # Optional: Blocking until the job finishes
Copy

작업 관리

아래 방법을 사용하여 작업 목록을 가져오거나, 작업을 취소하거나, 작업의 핸들을 가져오거나, 작업을 삭제할 수 있습니다.

from snowflake.ml.jobs import list_jobs, delete_job, get_job

# view logs to troubleshoot
job.get_logs()

# cancel a job
job.cancel()

# list to see all jobs
list_jobs().show()

# get the handle of a job
job = get_job("my_db.my_schema.job_name")

# delete a job that you no longer wish to run
delete_job(job)
Copy

참고

ML 작업 APIs의 result 함수는 일괄 추론 작업에 지원되지 않습니다.

추론 데이터 지정

일괄 추론에는 정형 데이터 또는 비정형 데이터를 사용할 수 있습니다. 워크플로에 정형 데이터를 사용하기 위해 run_batch 메서드에 SQL 쿼리 또는 데이터프레임을 제공할 수 있습니다.

비정형 데이터의 경우 Snowflake 스테이지에서 파일을 참조할 수 있습니다. 파일을 참조하려면 파일 경로를 사용하여 데이터 프레임을 생성합니다.

데이터 프레임을 run_batch 메서드에 제공합니다. run_batch는 파일의 내용을 모델에 제공합니다.

정형 입력

다음은 입력 가능성의 범위를 보여주는 예제입니다.

# providing input from a query
X = session.sql("SELECT id, feature_1, feature_2 FROM feature_table WHERE feature_1 > 100"),

# reading from parquet files
X = session.read.option("pattern",".*file.*\\.parquet")
    .parquet("@DB.SCHEMA.STAGE/some/path")
    .select(col("id1").alias("id"), col("feature_1"), col("feature_2"))).filter(col("feature_1") > 100)
Copy

비정형 입력(멀티모달)

비정형 데이터의 경우 run_batch 메서드는 입력 데이터 프레임에 제공된 정규화된 스테이지 경로에서 파일을 읽을 수 있습니다. 다음 예제에서는 비정형 입력 데이터를 지정하는 방법을 보여줍니다.

# Process a list of files
# The file paths have to be in the form of a full stage path as below
data = [
    ["@DB.SCHEMA.STAGE/dataset/files/file1"],
    ["@DB.SCHEMA.STAGE/dataset/files/file2"],
    ["@DB.SCHEMA.STAGE/dataset/files/file3"],
]
column_names = ["image"]
X = session.create_dataframe(data, schema=column_names)
Copy

스테이지의 모든 파일을 데이터 프레임으로 자동으로 나열하려면 다음과 같은 코드를 사용합니다.

from snowflake.ml.utils.stage_file import list_stage_files

# get all files under a path
X = list_stage_files(session, "@db.schema.my_stage/path")

# get all files under a path ending with ".jpg"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg")

# get all files under a path ending with ".jpg" and return the datafram with a column_name "IMAGES"
X = list_stage_files(session, "@db.schema.my_stage/path", pattern=".*\\.jpg", column_name="IMAGES")
Copy

데이터 타입 표현하기

run_batch는 파일을 모델 호환 형식으로 자동으로 변환합니다.

모델은 다음 형식 중 하나로 데이터를 허용할 수 있습니다.

  • RAW_BYTES

  • BASE64

예를 들어, 이미지가 PNG 형식으로 스테이지에 저장되고 모델이 RAW_BYTES를 허용하는 경우, input_spec 인자를 사용하여 Snowflake가 데이터를 변환하는 방법을 지정할 수 있습니다.

다음 예제 코드는 스테이지의 파일을 RAW_BYTES로 변환합니다.

mv.run_batch(
    X,
    input_spec=InputSpec(
 # we need to provide column_handling in the InputSpec to perform the necessary conversion
 # FULL_STAGE_PATH: fully qualified path (@db.schema.stage/path) to a file
 # RAW_BYTES: download and convert the file from the stage path to bytes
        column_handling={
            "path": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    ),
    ...
)
Copy

column_handling 인자는 X의 경로 열에 전체 스테이지 경로가 포함되어 있음을 프레임워크에 알리고 해당 파일의 원시 바이트로 모델을 호출합니다.

출력(output_spec)

다음과 같이 파일 출력을 저장할 스테이지 디렉터리를 지정합니다.

mv.run_batch(
    ...
    output_spec = OutputSpec(stage_location="@db.schema.stage/path/"),
)
Copy

Snowflake는 현재 텍스트를 출력하고 Parquet 파일로 저장하는 모델을 지원합니다. 다음과 같이 Parquet 파일을 Snowpark 데이터 프레임으로 변환할 수 있습니다.

session.read.option("pattern", ".*\\.parquet").parquet("@db.schema.stage/output_path/")
Copy

매개 변수 전달하기

모델의 서명에 ParamSpec </developer-guide/snowflake-ml/model-registry/model-signature>`으로 정의된 매개 변수가 포함된 경우, ``InputSpec``의 ``params` 인자를 사용하여 추론 시 매개 변수 값을 전달할 수 있습니다. 사전에 포함되지 않은 모든 매개 변수는 서명의 기본값을 사용합니다.

from snowflake.ml.model.batch import InputSpec, OutputSpec

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    input_spec=InputSpec(
        params={"temperature": 0.9, "max_tokens": 512}
    ),
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
)
Copy

작업 사양

일괄 추론 워크로드에 대한 작업 수준 설정(예: 워커 수, 리소스 할당, 실행 매개 변수)을 구성하려면 JobSpec 인스턴스를 run_batch 메서드의 job_spec 인자로 전달합니다. 예제가 아래에 나와 있습니다.

from snowflake.ml.model.batch import JobSpec, OutputSpec

job_spec = JobSpec(
    job_name="my_inference_job",
    cpu_requests="2",
    memory_requests="8GiB",
    max_batch_rows=2048,
    replicas=2,
)

job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    job_spec=job_spec,
)
Copy

모범 사례

센티넬 파일 사용하기

여러 가지 이유로 작업이 중간에 실패할 수 있습니다. 따라서 출력 디렉터리에 부분적인 데이터가 있을 수 있습니다. 작업 완료를 표시하기 위해 run_batch는 출력 디렉터리에 완료 파일 _SUCCESS를 작성합니다.

출력이 부분적이거나 잘못되지 않도록 하려면 다음을 수행합니다.

  • 센티넬 파일을 찾은 후에만 출력 데이터를 읽습니다.

  • 시작할 빈 디렉터리를 제공합니다.

  • 모드 = SaveMode.ERROR로 run_batch를 실행합니다.

사용자 지정 모델 사용하기

from transformers import pipeline
from snowflake.ml.model import custom_model
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat
from snowflake.ml.model.model_signature import core

# first we must define the schema, we'll expect audio file input as base64 string
signature = core.ModelSignature(
    inputs=[
        core.FeatureSpec(name="audio", dtype=core.DataType.STRING),
    ],
    outputs=[
        core.FeatureGroupSpec(
            name="outputs",
            specs=[
                core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                core.FeatureGroupSpec(
                    name="chunks",
                    specs=[
                        core.FeatureSpec(
                            name="timestamp", dtype=core.DataType.DOUBLE, shape=(2,)
                        ),
                        core.FeatureSpec(name="text", dtype=core.DataType.STRING),
                    ],
                    shape=(-1,),
                ),
            ],
        ),
    ],
)

# defining the custom model, we decode the input from base64 to bytes and
# use whisper to perform the transcription
class CustomTranscriber(custom_model.CustomModel):
    def __init__(self, context: custom_model.ModelContext) -> None:
        super().__init__(context)
        self.model = self.context.model_ref("my_model")

    @custom_model.inference_api
    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        import base64
        audio_b64_list = df["audio"].tolist()
        audio_bytes_list = [base64.b64decode(audio_b64) for audio_b64 in audio_b64_list]
        temp_res = [self.model(audio_bytes) for audio_bytes in audio_bytes_list]
        return pd.DataFrame({"outputs": temp_res})

# creating an instance of our transcriber for logging
transcriber = CustomTranscriber(
    custom_model.ModelContext(
        models={
            "my_model": pipeline(
                task="automatic-speech-recognition", model="openai/whisper-small"
            )
        }
    )
)

# log the model
mv = reg.log_model(
    transcriber,
    model_name="custom_transcriber",
    version_name="v1",
    signatures={"predict": signature},
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/audio/audio1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/audio/audio3.mp3"],
]
column_names = ["audio"] # This column was defined in the signature above
input_df = session.create_dataframe(data, schema=column_names)


job = mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# BASE_64: download and convert the file from the stage path to base64 string
        column_handling={
            "audio": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.BASE64}
        }
    )
)
Copy

Hugging Face 모델 사용하기

from transformers import pipeline
from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# supported Hugging Face tasks will have their signatures auto-inferred
classifier = pipeline(task="image-classification", model="google/vit-base-patch16-224")

# log the model
mv = reg.log_model(
    classifier,
    model_name="image_classifier",
    version_name="v1",
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
    pip_requirements=[
        "pillow" # dependency for image classification
    ],
)

# input dataframe
data = [
    ["@DB.SCHEMA.STAGE/dataset/image/image1.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image2.mp3"],
    ["@DB.SCHEMA.STAGE/dataset/image/image3.mp3"],
]
# this column was defined in the auto-inferred signature
# you can view the signature by calling 'mv.show_functions()'
column_names = ["images"]
input_df = session.create_dataframe(data, schema=column_names)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location=f"@my_db.my_schema.my_stage/path/"),
    input_spec=InputSpec(
# we need to provide column_handling in the InputSpec to perform the necessary conversion
# FULL_STAGE_PATH: fully qualified path (db.schema.stage/path) to a file
# RAW_BYTES: download and convert the file to bytes (matching the predefined signature)
        column_handling={
            "IMAGES": {"input_format": InputFormat.FULL_STAGE_PATH, "convert_to": FileEncoding.RAW_BYTES}
        }
    )
)
Copy

vLLM과 함께 Hugging Face 모델 사용하기

작업: 텍스트 생성

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec, FileEncoding, InputFormat

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2.5-0.5B-Instruct", task="text-generation")

mv = reg.log_model(
    model,
    model_name="qwenw_5",
    version_name="v1",
    options={"cuda_version": "12.4"},
    target_platforms=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "How many breeds of cats are there?"},
        ]
    }
]]
schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

작업: 이미지 텍스트를 텍스트로

import json

from snowflake.ml.model import target_platform
from snowflake.ml.model.batch import InputSpec, OutputSpec

# it's a large model so we remotely log it
model = huggingface.TransformersPipeline(model="Qwen/Qwen2-VL-2B-Instruct", task="image-text-to-text")

mv = reg.log_model(
    model,
    model_name="qwen2_vl_2b",
    version_name="v1",
    options={"cuda_version": "12.4"},
    targets=target_platform.SNOWPARK_CONTAINER_SERVICES_ONLY,
)


# constructing OpenAi chat/completions API compatible messages
messages = [[
    {"role": "system", "content": [{"type": "text", "text": "You are an expert on cats and kitchens."}]},
    {
        "role": "user",
        "content": [
            {"type": "text", "text": "What breed of cat is this?"},
            {
                "type": "image_url",
                "image_url": {
                    # run_batch will downlaod and convert the file to the format that vLLM can handle
                    "url": f"@db.schema.stage/path/cat.jpeg",
                }
            }
     # you can also pass video and audio like below
            # {
            #     "type": "video_url",
            #     "video_url": {
            #         "url": "@db.schema.stage/path/video.avi",
            #     }
            # }
            # {
            #     "type": "input_audio",
            #     "input_audio": {
            #         "data": "@db.schema.stage/path/audio.mp3",
            #         "format": "mp3",
            #     }
            # }
        ]
    }
]]

schema = ["messages"]
data = [(json.dumps(m)) for m in messages]
input_df = session.create_dataframe(data, schema=schema)

mv.run_batch(
    X=input_df,
    compute_pool="my_compute_pool",
    output_spec=OutputSpec(stage_location="@my_db.my_schema.my_stage/path/"),
    inference_engine_options={
 # set vLLM as the inference backend
        "engine": InferenceEngine.VLLM,
    },
)
Copy

샘플 노트북

실행 가능한 엔드투엔드 예제는 GitHub의 `일괄 추론 샘플 노트북 <https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/model_serving/batch_inference>`_을 참조하세요.