Snowflake ML 작업¶
Snowflake ML Jobs를 사용하여 Snowflake ML Container Runtime 내에서 머신 러닝(ML) 워크플로를 실행합니다. 모든 개발 환경에서 실행할 수 있습니다. Snowflake 워크시트나 노트북에서 코드를 실행할 필요는 없습니다. 작업을 사용하여 개발 워크플로 내에서 리소스 집약적인 작업을 실행하기 위해 Snowflake의 인프라를 활용하십시오. 로컬에서 Snowflake ML 을 설정하는 방법에 대한 자세한 내용은 로컬에서 Snowflake ML 사용하기 섹션을 참조하십시오.
중요
Snowflake ML Jobs는 snowflake-ml-python
버전 1.9.2 이상에서 사용할 수 있습니다.
Snowflake ML Jobs를 통해 다음을 수행할 수 있습니다.
GPU 및 고용량 메모리 CPU 인스턴스를 포함하여 Snowflake Compute Pool에서 ML 워크로드를 실행할 수 있습니다.
VS Code 또는 Jupyter 노트북 등 선호하는 개발 환경을 사용할 수 있습니다.
런타임 환경 내에서 사용자 정의 Python 패키지를 설치하여 사용할 수 있습니다.
Snowflake의 분산형 API를 사용하여 데이터 로딩, 학습 및 하이퍼 매개 변수 튜닝을 최적화할 수 있습니다.
Apache Airflow와 같은 오케스트레이션 도구와 통합할 수 있습니다.
Snowflake의 API를 통해 작업을 모니터링하고 관리할 수 있습니다.
이러한 기능을 사용하여 다음을 수행할 수 있습니다.
GPU 가속 또는 상당한 컴퓨팅 리소스가 필요한 대규모 데이터 세트에 대해 리소스 집약적인 학습을 실행할 수 있습니다.
파이프라인을 통한 프로그래밍 실행으로 개발에서 프로덕션으로 ML 코드를 이동하여 ML 워크플로를 프로덕션화할 수 있습니다.
기존 개발 환경을 유지하면서 Snowflake의 컴퓨팅 리소스를 활용할 수 있습니다.
최소한의 코드 변경으로 OSS ML 워크플로를 해제하고 전환할 수 있습니다.
Large Snowflake 데이터 세트로 직접 작업하여 데이터 이동을 줄이고 값비싼 데이터 전송을 피할 수 있습니다.
전제 조건¶
중요
Snowflake ML Jobs는 현재 Python 3.10 클라이언트만 지원합니다. 다른 Python 버전에 대한 지원이 필요한 경우 Snowflake 계정 팀에 문의하십시오.
Python 3.10 환경에 Snowflake ML Python 패키지를 설치합니다.
pip install snowflake-ml-python>=1.9.2
기본 컴퓨팅 풀 크기는 CPU_X 64_S 인스턴스 패밀리를 사용합니다. 최소 노드 수는 1개, 최대 노드 수는 25개입니다. 다음 SQL 명령을 사용하여 사용자 지정 컴퓨팅 풀을 만들 수 있습니다.
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Snowflake ML Jobs에는 Snowpark 세션이 필요합니다. 다음 코드를 사용하여 생성합니다.
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
세션 생성에 대한 자세한 내용은 세션 만들기 섹션을 참조하십시오.
Snowflake ML 작업 실행하기¶
다음 방법 중 하나로 Snowflake ML Job을 실행할 수 있습니다.
코드 내에서 함수 데코레이터 사용.
Python API 를 사용하여 전체 파일 또는 디렉터리 제출.
Python 함수를 Snowflake ML Job으로 실행하기¶
Function Dispatch를 사용하여 @remote
데코레이터를 통해 Snowflake의 컴퓨팅 리소스에서 개별 Python 함수를 원격으로 실행할 수 있습니다.
@remote
데코레이터를 사용하여 다음을 수행할 수 있습니다.
함수와 그 종속성을 직렬화할 수 있습니다.
지정된 Snowflake 스테이지에 업로드할 수 있습니다.
ML 에 대한 Container Runtime 내에서 실행할 수 있습니다.
다음 예제 Python 코드는 @remote
데코레이터를 사용하여 Snowflake ML Job을 제출합니다. Snowpark Session
이 필요하며, 전제 조건 섹션을 참조하십시오.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
@remote
데코레이터 함수를 호출하면 작업 실행을 관리하고 모니터링하는 데 사용할 수 있는 Snowflake MLJob
오브젝트가 반환됩니다. 자세한 내용은 ML Jobs 관리하기 섹션을 참조하십시오.
Python 파일을 Snowflake로 ML Job으로 실행하기¶
Python 파일 또는 프로젝트 디렉터리를 Snowflake 컴퓨팅 리소스에서 실행합니다. 이 기능은 다음과 같은 경우에 유용합니다.
여러 모듈과 종속성이 있는 복잡한 ML 프로젝트가 있는 경우.
로컬 개발 코드와 제품 코드의 분리를 유지하려고 하는 경우.
명령줄 인자를 사용하는 스크립트를 실행해야 하는 경우.
기존 ML 프로젝트로 작업하고 있는데, 이 프로젝트는 특별히 Snowflake 컴퓨팅에서 실행하도록 설계되지 않았습니다.
Snowflake Job API는 파일 기반 페이로드를 제출할 수 있는 세 가지 주요 메서드를 제공합니다.
submit_file()
: 단일 Python 파일 실행용submit_directory()
: 여러 파일과 리소스에 걸쳐 있는 Python 프로젝트를 실행하는 경우submit_from_stage()
: Snowflake 스테이지에 저장된 Python 프로젝트를 실행하는 경우
두 가지 방법 모두 다음을 지원합니다.
명령줄 인자 전달
환경 변수 구성
사용자 지정 종속성 사양
Snowflake 스테이지를 통한 프로젝트 자산 관리
File Dispatch는 기존 ML 워크플로를 프로덕션화하고 개발 환경과 실행 환경을 명확하게 분리하는 데 특히 유용합니다.
다음 Python 코드는 파일을 Snowflake ML Job으로 제출합니다.
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
다음 Python 코드는 디렉터리를 Snowflake ML Job으로 제출합니다.
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
다음 Python 코드는 Snowflake 스테이지의 디렉터리를 Snowflake ML Job으로 제출합니다.
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
파일 또는 디렉터리를 제출하면 작업 실행을 관리하고 모니터링하는 데 사용할 수 있는 Snowflake MLJob
오브젝트가 반환됩니다. 자세한 내용은 ML Jobs 관리하기 섹션을 참조하십시오.
제출 시 추가 페이로드 지원하기¶
파일, 디렉터리를 제출하거나 스테이지에서 제출할 때 작업 실행 중에 사용할 수 있는 추가 페이로드가 지원됩니다. 가져오기 경로는 명시적으로 지정할 수 있습니다. 그렇지 않으면 추가 페이로드 위치에서 추론됩니다.
중요
디렉터리만 가져오기 소스로 지정할 수 있습니다. 개별 파일 가져오기는 지원되지 않습니다.
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
ML Jobs에서 Snowpark 세션에 액세스하기¶
Snowflake에서 ML Jobs를 실행 중인 경우 Snowpark 세션은 실행 컨텍스트에서 자동으로 사용할 수 있습니다. 다음과 같은 접근 방식을 사용해 ML Jobs 페이로드 내에서 세션 오브젝트에 액세스할 수 있습니다.
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
Snowpark 세션을 사용하면 ML Job 내부의 Snowflake 테이블, 스테이지 및 기타 데이터베이스 오브젝트에 액세스할 수 있습니다.
ML Jobs에서 결과 반환하기¶
Snowflake ML Jobs는 실행 결과를 클라이언트 환경으로 다시 반환하는 기능을 지원합니다. 이 기능을 사용해 계산된 값, 학습된 모델 또는 작업 페이로드에서 생성된 기타 아티팩트를 검색할 수 있습니다.
함수 디스패치의 경우 데코레이트된 함수에서 값을 반환하기만 하면 됩니다. 반환된 값은 직렬화되어 result()
메서드를 통해 제공됩니다.
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
파일 기반 작업의 경우 특수 __return__
변수를 사용하여 반환 값을 지정합니다.
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
MLJob.result()
API를 사용하여 작업 실행 결과를 검색할 수 있습니다. API는 작업이 종료 상태에 도달할 때까지 호출 스레드를 차단했다가 페이로드의 반환 값을 반환하거나, 실행에 실패한 경우 예외를 발생시킵니다. 페이로드에 반환 값이 정의되어 있지 않은 경우 결과는 성공 시 :code:`None`이 됩니다.
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
ML Jobs 관리하기¶
Snowflake ML Job을 제출하면 API 에서 MLJob
오브젝트를 생성합니다. 이를 사용하여 다음을 수행할 수 있습니다.
상태 업데이트를 통한 작업 진행 상황 추적
자세한 실행 로그를 사용하여 문제 디버그
실행 결과 검색(있는 경우)
get_job()
API 를 사용하여 MLJob
오브젝트를 ID 로 검색할 수 있습니다. 다음 Python 코드는 MLJob
오브젝트를 검색하는 방법을 보여줍니다.
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
종속성 관리하기¶
Snowflake ML Job API 는 Container Runtime for ML 환경 내에서 페이로드를 실행합니다. 이 환경에는 머신 러닝과 데이터 과학에 가장 일반적으로 사용되는 Python 패키지가 있습니다. 대부분의 사용 사례는 추가 구성 없이 ‘즉시’ 작동합니다. 사용자 지정 종속성이 필요한 경우 pip_requirements
를 사용하여 설치할 수 있습니다.
사용자 지정 종속성을 설치하려면 외부 액세스 통합을 사용하여 외부 네트워크 액세스를 활성화해야 합니다. 다음 SQL 예제 명령을 사용하여 액세스를 제공할 수 있습니다.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
외부 액세스 통합에 대한 자세한 내용은 외부 액세스 통합 생성 및 사용하기 섹션을 참조하십시오.
외부 네트워크 액세스를 제공한 후 pip_requirements
및 external_access_integrations
매개 변수를 사용하여 사용자 지정 종속성을 구성할 수 있습니다. Container Runtime 환경에서 사용할 수 없거나 특정 버전의 패키지를 사용하는 경우 패키지를 사용할 수 있습니다.
다음 Python 코드는 remote
데코레이터에 사용자 지정 종속성을 지정하는 방법을 보여줍니다.
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
다음 Python 코드는 submit_file()
메서드에 대한 사용자 지정 종속성을 지정하는 방법을 보여줍니다.
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
비공개 패키지 피드¶
Snowflake ML Jobs는 JFrog Artifactory 및 Sonatype Nexus 리포지토리와 같은 비공개 피드에서 패키지를 로딩하는 것도 지원합니다. 이러한 피드는 일반적으로 내부 및 독점 패키지를 배포하고, 종속성 버전을 제어하며, 보안/규정 준수를 보장하는 데 사용됩니다.
비공개 피드에서 패키지를 설치하려면 다음을 수행해야 합니다.
비공개 피드의 URL 에 대한 액세스를 허용하는 네트워크 규칙을 만듭니다.
기본 인증을 사용하는 원본의 경우 간단히 네트워크 규칙을 만들면 됩니다.
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
비공개 연결(즉, Private Link)을 사용하여 원본에 대한 액세스를 구성하려면 비공개 연결을 사용한 네트워크 송신 의 단계를 따르십시오.
네트워크 규칙을 사용하여 외부 액세스 통합을 만듭니다. 작업을 제출할 역할에 EAI 사용 권한을 부여합니다.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
작업을 제출할 때 비공개 피드 URL, External Access Integration 및 패키지를 지정합니다.
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
비공개 피드 URL에 인증 토큰 같이 민감한 정보가 포함된 경우 Snowflake 시크릿을 만들어 URL을 관리합니다. /sql-reference/sql/create-secret`을 사용하여 시크릿을 만들 수 있습니다. 작업을 제출할 때 :code:`spec_overrides 인자를 사용하여 시크릿을 구성하세요.
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
container.secrets
에 대한 자세한 내용은 containers.secrets 필드 섹션을 참조하십시오.
예¶
Snowflake ML Jobs를 사용하는 방법에 대한 예제는 `ML Jobs 코드 샘플<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs>`_을 참조하세요.
비용 고려 사항¶
Snowflake ML Jobs는 Snowpark Container Services에서 실행되며 사용량에 따라 요금이 청구됩니다. 컴퓨팅 비용에 대한 자세한 내용은 Snowpark Container Services 비용 섹션을 참조하십시오.
작업 페이로드는 stage_name
인자로 지정된 스테이지에 업로드됩니다. 추가 요금이 부과되지 않도록 하려면 반드시 정리해야 합니다. 스테이지 저장소와 관련된 비용에 대한 자세한 내용은 저장소 비용 이해하기 및 저장소 비용 살펴보기 섹션을 참조하십시오.