Snowflake MLのジョブ¶
Snowflake ML Jobsを使用して、Snowflake ML コンテナーランタイム内で機械学習(ML)ワークフローを実行します。どの開発環境からでも実行できます。Snowflakeワークシートやノートブックでコードを実行する必要はありません。ジョブを使用してSnowflakeのインフラストラクチャを活用し、開発ワークフローの中でリソース集約型のタスクを実行します。Snowflake ML のローカルでのセットアップ情報については、 Snowflake ML をローカルで使用する を参照してください。
重要
snowflake-ml-python
バージョン1.9.2以降でSnowflake ML ジョブを利用できます。
Snowflake ML Jobsでは以下のことが可能です。
GPU や高いメモリの CPU インスタンスなど、SnowflakeのCompute Poolで ML ワークロードを実行します。
VS CodeやJupyterノートブックなど、お好みの開発環境をご利用ください。
ランタイム環境内にカスタムのPythonパッケージをインストールして使用します。
Snowflakeの分散 APIs を使用して、データのロード、トレーニング、ハイパーパラメーターのチューニングを最適化します。
Apache Airflowなどのオーケストレーションツールと統合できます。
Snowflakeの APIs を通してジョブの監視と管理を行います。
これらの機能を使用して、次のことができます。
GPU アクセラレーションや大規模なコンピューティングリソースを必要とする大容量データセットに対して、リソース集約的なトレーニングを実行します。
パイプラインを介したプログラム実行により、 ML コードを開発環境から本番環境に移動することで、 ML ワークフローを本稼働へ移行します。
既存の開発環境を維持しながら、Snowflakeのコンピューティングリソースを活用できます。
最小限のコード変更で OSS ML ワークフローをリフトアンドシフトします。
大容量のSnowflakeデータセットを直接扱うことで、データ移動を減らし、高価なデータ転送を回避します。
前提条件¶
重要
Snowflake ML Jobsは現在Python 3.10クライアントのみをサポートしています。その他のPythonバージョンのサポートが必要な場合は、Snowflakeアカウントチームまでご連絡ください。
Python 3.10環境にSnowflake ML Pythonパッケージをインストールします。
pip install snowflake-ml-python>=1.9.2
デフォルトのコンピューティングプールサイズは、 CPU_X64_Sインスタンスファミリーを使用します。ノード数の最小値は1、最大値は25です。次の SQL コマンドを使用して、カスタムコンピューティングプールを作成できます。
CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL MIN_NODES = <MIN_NODES> MAX_NODES = <MAX_NODES> INSTANCE_FAMILY = <INSTANCE_FAMILY>;
Snowflake ML JobsにはSnowparkセッションが必要です。次のコードで作成します。
from snowflake.snowpark import Session from snowflake.ml.jobs import list_jobs ls = list_jobs() # This will fail! You must create a session first. # Requires valid ~/.snowflake/config.toml file session = Session.builder.getOrCreate() ls = list_jobs(session=session) ls = list_jobs() # Infers created session from context
セッションの作成に関する情報は、 セッションの作成 を参照してください。
Snowflake ML ジョブを実行する¶
Snowflake ML ジョブは、以下のいずれかの方法で実行できます。
コード内での関数デコレーターの使用。
Python API を使用して、ファイルまたはディレクトリ全体を送信します。
Snowflake ML ジョブとしてPython関数を実行する¶
Function Dispatchを使用して、 @remote
デコレーターを使用してSnowflakeのコンピューティングリソース上で個々のPython関数をリモートで実行します。
@remote
デコレーターを使うと、次のようなことができます。
関数とその依存関係をシリアライズします。
指定したSnowflakeステージにアップロードします。
ML のContainer Runtime内で実行します。
次のPythonコード例では、 @remote
デコレーターを使用してSnowflake ML ジョブを送信します。なお、Snowpark Session
が必要です。前提条件 をご覧ください。
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
# Provide your ML code here, including imports and function calls
...
job = train_model("my_training_data")
@remote
デコレート関数を呼び出すと、ジョブの実行管理と監視に使用できるSnowflake MLJob
オブジェクトが返されます。詳細については、 管理 ML ジョブ をご参照ください。
PythonファイルをSnowflake ML ジョブとして実行する¶
Snowflakeコンピューティングリソース上でPythonファイルまたはプロジェクトディレクトリを実行します。こんな時に便利です。
複数のモジュールと依存関係を持つ複雑な ML プロジェクトを持っている場合。
ローカルの開発コードと本番コードの分離を維持したい場合。
コマンドライン引数を使用するスクリプトを実行する必要がある場合。
Snowflakeコンピュートで実行するように特別に設計されていない既存の ML プロジェクトを操作している場合。
Snowflakeジョブ API は、ファイルベースのペイロードを提出する3つの主なメソッドを提供します。
submit_file()
: 単一のPythonファイルを実行する場合submit_directory()
:複数のファイルとリソースにまたがるPythonプロジェクトを実行する場合submit_from_stage()
:Snowflakeステージに保存されたPythonプロジェクトを実行する場合
どちらのメソッドでも以下がサポートされます。
コマンドライン引数パッシング
環境変数の構成
カスタム依存関係仕様
Snowflakeステージによるプロジェクト資産管理
File Dispatchは、既存の ML ワークフローを本稼働へ移行し、開発環境と実行環境の明確な分離を維持するために特に役立ちます。
次のPythonコードは、Snowflake ML ジョブとしてファイルを送信します。
from snowflake.ml.jobs import submit_file
# Run a single file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
args=["--data-table", "my_training_data"],
session=session,
)
次のPythonコードは、ディレクトリをSnowflake ML ジョブとして送信します。
from snowflake.ml.jobs import submit_directory
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
)
次のPythonコードは、SnowflakeステージからSnowflake ML ジョブとしてディレクトリを送信します。
from snowflake.ml.jobs import submit_from_stage
# Run from a directory
job3 = submit_from_stage(
"@source_stage/ml_project/"
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
)
# Entrypoint may also be a relative path
job4 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py", # Resolves to @source_stage/ml_project/train.py
stage_name="payload_stage",
session=session,
)
ファイルまたはディレクトリを送信すると、ジョブの実行を管理および監視するために使用できるSnowflake MLJob
オブジェクトが返されます。詳細については、 管理 ML ジョブ をご参照ください。
送信での追加ペイロードのサポート¶
ファイル、ディレクトリ、またはステージからファイルを送信する場合は、ジョブの実行中に使用する追加のペイロードがサポートされます。インポートパスは明示的に指定できます。そうでない場合は、追加ペイロードの場所から推測されます。
重要
インポートソースとして指定できるのは、ディレクトリのみです。個別のファイルのインポートはサポートされていません。
# Run from a file
job1 = submit_file(
"train.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/", "utils"), # the import path is utils
],
)
# Run from a directory
job2 = submit_directory(
"./ml_project/",
"MY_COMPUTE_POOL",
entrypoint="train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("src/utils/"), # the import path is utils
],
)
# Run from a stage
job3 = submit_from_stage(
"@source_stage/ml_project/",
"MY_COMPUTE_POOL",
entrypoint="@source_stage/ml_project/train.py",
stage_name="payload_stage",
session=session,
additional_payloads=[
("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
],
)
ML ジョブでのSnowparkセッションへのアクセス¶
Snowflake上で ML ジョブを実行すると、実行コンテキストでSnowparkセッションが自動的に利用可能です。次のアプローチを使用して、 ML ジョブのペイロード内からセッションオブジェクトにアクセスできます。
from snowflake.ml.jobs import remote
from snowflake.snowpark import Session
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
# This approach works for all payload types, including file and directory payloads
session = Session.builder.getOrCreate()
print(session.sql("SELECT CURRENT_VERSION()").collect())
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
# This approach works only for function dispatch payloads
# The session is injected automatically by the Snowflake ML Job API
print(session.sql("SELECT CURRENT_VERSION()").collect())
Snowparkセッションを使用すると、 ML ジョブ内のSnowflakeテーブル、ステージ、およびその他のデータベースオブジェクトにアクセスできます。
ML ジョブから結果を返す¶
Snowflake ML ジョブは、実行結果をクライアント環境に返すことをサポートします。これにより、計算値、トレーニング済みモデル、またはジョブペイロードによって生成されたその他のアーティファクトを取得できます。
関数のディスパッチの場合は、装飾された関数から値を単純に返します。戻り値はシリアル化され、 result()
メソッドによって利用可能になります。
from snowflake.ml.jobs import remote
@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
job1 = train_model("my_training_data")
ファイルベースのジョブの場合は、特別な __return__
変数を使用して戻り値を指定します。
# Example: /path/to/repo/my_script.py
def main():
# Your ML code here
model = XGBClassifier()
model.fit(data_table)
return model
if __name__ == "__main__":
__return__ = main()
from snowflake.ml.jobs import submit_file
job2 = submit_file(
"/path/to/repo/my_script.py",
"MY_COMPUTE_POOL",
stage_name="payload_stage",
session=session,
)
MLJob.result()`API を使用してジョブの実行結果を取得できます。API は、ジョブが終了状態に達するまで呼び出しスレッドをブロックし、ペイロードの戻り値を返すか、実行に失敗した場合は例外を発生させます。ペイロードが戻り値を定義しない場合、成功した場合の結果は :code:`None
になります。
# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
管理 ML ジョブ¶
Snowflake ML ジョブを送信すると、 API は MLJob
オブジェクトを作成します。これを使用して次のことを実行できます。
ステータス更新によるジョブの進捗管理
詳細な実行ログによる問題のデバッグ
実行結果の取得(もしあれば)
get_job()
API を使って、 MLJob
オブジェクトを ID で取得することができます。次のPythonコードは、 MLJob
オブジェクトを取得する方法を示しています。
from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job
# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df) # Display list in table format
# Retrieve an existing job based on ID
job = get_job("<job_id>") # job is an MLJob instance
# Retrieve status and logs for the retrieved job
print(job.status) # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
# Clean up the job
delete_job(job)
依存関係の管理¶
Snowflake ML Job API は ML のContainer Runtime 環境内でペイロードを実行します。この環境には、機械学習やデータサイエンスのために最も一般的に使用されているPythonパッケージが用意されています。ほとんどのユースケースは、追加の構成なしで「そのまま」使用できるはずです。カスタム依存関係が必要な場合は、 pip_requirements
を使ってインストールできます。
カスタム依存関係をインストールするには、外部アクセス統合を使用して外部ネットワークアクセスを有効にする必要があります。以下の SQL 例コマンドを使用してアクセスを提供することができます。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
ENABLED = true;
外部アクセス統合の詳細については、 外部アクセス統合の作成と使用 をご参照ください。
外部ネットワークアクセスを提供した後、 pip_requirements
と external_access_integrations
パラメーターを使用してカスタム依存関係を構成できます。コンテナーランタイム環境で利用できないパッケージや、特定のバージョンのパッケージを使用することができます。
次のPythonコードは、 remote
デコレーターにカスタム依存関係を指定する方法を示します。
@remote(
"MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=["custom-package"],
external_access_integrations=["PYPI_EAI"],
session=session,
)
def my_function():
# Your code here
次のPythonコードは、 submit_file()
メソッドにカスタム依存関係を指定する方法を示します。
from snowflake.ml.jobs import submit_file
# Can include version specifier to specify version(s)
job = submit_file(
"/path/to/repo/my_script.py",
compute_pool,
stage_name="payload_stage",
pip_requirements=["custom-package==1.0.*"],
external_access_integrations=["pypi_eai"],
session=session,
)
プライベートパッケージフィード¶
Snowflake ML Jobsは、 JFrog ArtifactoryやSonatype Nexus Repositoryなどのプライベートフィードからのパッケージのロードもサポートしています。これらのフィードは、内部パッケージや独自パッケージの配布、依存関係のバージョン管理、セキュリティ/コンプライアンス確保のためによく使われます。
プライベートフィードからパッケージをインストールするには、以下を実行する必要があります。
プライベートフィードの URL へのアクセスを許可するネットワークルールを作成します。
基本認証を使用するソースの場合は、ネットワークルールを作成するだけです。
CREATE OR REPLACE NETWORK RULE private_feed_nr MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('<your-repo>.jfrog.io');
プライベート接続(つまり、Private Link)を使用してソースへのアクセスを構成するには、 プライベート接続を使用したネットワークエグレス の手順に従ってください。
ネットワークルールを使用して、外部アクセス統合を作成します。ジョブを送信するロールに、 EAI の使用許可を付与します。
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR) ENABLED = true; GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
ジョブ送信時に、プライベートフィード URL、外部アクセス統合、パッケージを指定します
# Option 1: Specify private feed URL in pip_requirements job = submit_file( "/path/to/script.py", compute_pool="MY_COMPUTE_POOL", stage_name="payload_stage", pip_requirements=[ "--index-url=https://your.private.feed.url", "internal-package==1.2.3" ], external_access_integrations=["PRIVATE_FEED_EAI"] )
# Option 2: Specify private feed URL by environment variable job = submit_directory( "/path/to/code/", compute_pool="MY_COMPUTE_POOL", entrypoint="script.py", stage_name="payload_stage", pip_requirements=["internal-package==1.2.3"], external_access_integrations=["PRIVATE_FEED_EAI"], env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'}, )
プライベートフィード URL に認証トークンなどの機密情報が含まれている場合、Snowflakeシークレットを作成して URL を管理します。CREATE SECRET を使用してシークレットを作成します。spec_overrides
引数を使用して、ジョブ送信中にシークレットを構成します。
# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
TYPE = GENERIC_STRING
SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()
# Prepare service spec override for mounting secret into job execution
spec_overrides = {
"spec": {
"containers": [
{
"name": "main", # Primary container name is always "main"
"secrets": [
{
"snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
"envVarName": "PIP_INDEX_URL",
"secretKeyRef": "secret_string"
},
],
}
]
}
}
# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
"/path/to/script.py",
compute_pool="MY_COMPUTE_POOL",
stage_name="payload_stage",
pip_requirements=[
"internal-package==1.2.3"
],
external_access_integrations=["PRIVATE_FEED_EAI"],
spec_overrides=spec_overrides,
)
container.secrets
の情報については、 containers.secrets フィールド をご参照ください。
例¶
Snowflake ML ジョブの使用方法例については、`MLジョブコードサンプル<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs>`_ をご参照ください。
コストの考慮事項¶
Snowflake ML JobsはSnowpark Container Services上で実行され、使用量に応じて請求されます。コンピューティングコストに関する情報は、 Snowpark Container Servicesコスト をご覧ください。
ジョブペイロードは stage_name
引数で指定されたステージにアップロードされます。追加料金を請求されないようにするには、クリーンアップが必要です。ステージストレージに関するコストについては、 ストレージコストについて および ストレージコストの調査 をご参照ください。