Snowflake MLのジョブ

Snowflake ML Jobsを使用して、Snowflake ML コンテナーランタイム内で機械学習(ML)ワークフローを実行します。どの開発環境からでも実行できます。Snowflakeワークシートやノートブックでコードを実行する必要はありません。ジョブを使用してSnowflakeのインフラストラクチャを活用し、開発ワークフローの中でリソース集約型のタスクを実行します。Snowflake ML のローカルでのセットアップ情報については、 Snowflake ML をローカルで使用する を参照してください。

重要

snowflake-ml-python バージョン1.9.2以降でSnowflake ML ジョブを利用できます。

Snowflake ML Jobsでは以下のことが可能です。

  • GPU や高いメモリの CPU インスタンスなど、SnowflakeのCompute Poolで ML ワークロードを実行します。

  • VS CodeやJupyterノートブックなど、お好みの開発環境をご利用ください。

  • ランタイム環境内にカスタムのPythonパッケージをインストールして使用します。

  • Snowflakeの分散 APIs を使用して、データのロード、トレーニング、ハイパーパラメーターのチューニングを最適化します。

  • Apache Airflowなどのオーケストレーションツールと統合できます。

  • Snowflakeの APIs を通してジョブの監視と管理を行います。

これらの機能を使用して、次のことができます。

  • GPU アクセラレーションや大規模なコンピューティングリソースを必要とする大容量データセットに対して、リソース集約的なトレーニングを実行します。

  • パイプラインを介したプログラム実行により、 ML コードを開発環境から本番環境に移動することで、 ML ワークフローを本稼働へ移行します。

  • 既存の開発環境を維持しながら、Snowflakeのコンピューティングリソースを活用できます。

  • 最小限のコード変更で OSS ML ワークフローをリフトアンドシフトします。

  • 大容量のSnowflakeデータセットを直接扱うことで、データ移動を減らし、高価なデータ転送を回避します。

前提条件

重要

Snowflake ML Jobsは現在Python 3.10クライアントのみをサポートしています。その他のPythonバージョンのサポートが必要な場合は、Snowflakeアカウントチームまでご連絡ください。

  1. Python 3.10環境にSnowflake ML Pythonパッケージをインストールします。

    pip install snowflake-ml-python>=1.9.2
    
    Copy
  2. デフォルトのコンピューティングプールサイズは、 CPU_X64_Sインスタンスファミリーを使用します。ノード数の最小値は1、最大値は25です。次の SQL コマンドを使用して、カスタムコンピューティングプールを作成できます。

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. Snowflake ML JobsにはSnowparkセッションが必要です。次のコードで作成します。

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    セッションの作成に関する情報は、 セッションの作成 を参照してください。

Snowflake ML ジョブを実行する

Snowflake ML ジョブは、以下のいずれかの方法で実行できます。

  • コード内での関数デコレーターの使用。

  • Python API を使用して、ファイルまたはディレクトリ全体を送信します。

Snowflake ML ジョブとしてPython関数を実行する

Function Dispatchを使用して、 @remote デコレーターを使用してSnowflakeのコンピューティングリソース上で個々のPython関数をリモートで実行します。

@remote デコレーターを使うと、次のようなことができます。

  • 関数とその依存関係をシリアライズします。

  • 指定したSnowflakeステージにアップロードします。

  • Execute it within a Container Runtime.

次のPythonコード例では、 @remote デコレーターを使用してSnowflake ML ジョブを送信します。なお、Snowpark Session が必要です。前提条件 をご覧ください。

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

@remote デコレート関数を呼び出すと、ジョブの実行管理と監視に使用できるSnowflake MLJob オブジェクトが返されます。詳細については、 管理 ML ジョブ をご参照ください。

PythonファイルをSnowflake ML ジョブとして実行する

Snowflakeコンピューティングリソース上でPythonファイルまたはプロジェクトディレクトリを実行します。こんな時に便利です。

  • 複数のモジュールと依存関係を持つ複雑な ML プロジェクトを持っている場合。

  • ローカルの開発コードと本番コードの分離を維持したい場合。

  • コマンドライン引数を使用するスクリプトを実行する必要がある場合。

  • Snowflakeコンピュートで実行するように特別に設計されていない既存の ML プロジェクトを操作している場合。

Snowflakeジョブ API は、ファイルベースのペイロードを提出する3つの主なメソッドを提供します。

  • submit_file(): 単一のPythonファイルを実行する場合

  • submit_directory():複数のファイルとリソースにまたがるPythonプロジェクトを実行する場合

  • submit_from_stage():Snowflakeステージに保存されたPythonプロジェクトを実行する場合

どちらのメソッドでも以下がサポートされます。

  • コマンドライン引数パッシング

  • 環境変数の構成

  • カスタム依存関係仕様

  • Snowflakeステージによるプロジェクト資産管理

File Dispatchは、既存の ML ワークフローを本稼働へ移行し、開発環境と実行環境の明確な分離を維持するために特に役立ちます。

次のPythonコードは、Snowflake ML ジョブとしてファイルを送信します。

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

次のPythonコードは、ディレクトリをSnowflake ML ジョブとして送信します。

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

次のPythonコードは、SnowflakeステージからSnowflake ML ジョブとしてディレクトリを送信します。

from snowflake.ml.jobs import submit_from_stage

# Run from a directory
job3 = submit_from_stage(
  "@source_stage/ml_project/"
  "MY_COMPUTE_POOL",
  entrypoint="@source_stage/ml_project/train.py",
  stage_name="payload_stage",
  session=session,
)

# Entrypoint may also be a relative path
job4 = submit_from_stage(
  "@source_stage/ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",  # Resolves to @source_stage/ml_project/train.py
  stage_name="payload_stage",
  session=session,
)
Copy

ファイルまたはディレクトリを送信すると、ジョブの実行を管理および監視するために使用できるSnowflake MLJob オブジェクトが返されます。詳細については、 管理 ML ジョブ をご参照ください。

送信での追加ペイロードのサポート

ファイル、ディレクトリ、またはステージからファイルを送信する場合は、ジョブの実行中に使用する追加のペイロードがサポートされます。インポートパスは明示的に指定できます。そうでない場合は、追加ペイロードの場所から推測されます。

重要

インポートソースとして指定できるのは、ディレクトリのみです。個別のファイルのインポートはサポートされていません。

# Run from a file
 job1 = submit_file(
   "train.py",
   "MY_COMPUTE_POOL",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/", "utils"), # the import path is utils
   ],
 )

 # Run from a directory
 job2 = submit_directory(
   "./ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("src/utils/"), # the import path is utils
   ],
 )

 # Run from a stage
 job3 = submit_from_stage(
   "@source_stage/ml_project/",
   "MY_COMPUTE_POOL",
   entrypoint="@source_stage/ml_project/train.py",
   stage_name="payload_stage",
   session=session,
   additional_payloads=[
     ("@source_stage/src/utils/sub_utils/", "utils.sub_utils"),
   ],
 )
Copy

ML ジョブでのSnowparkセッションへのアクセス

Snowflake上で ML ジョブを実行すると、実行コンテキストでSnowparkセッションが自動的に利用可能です。次のアプローチを使用して、 ML ジョブのペイロード内からセッションオブジェクトにアクセスできます。

from snowflake.ml.jobs import remote
from snowflake.snowpark import Session

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function():
  # This approach works for all payload types, including file and directory payloads
  session = Session.builder.getOrCreate()
  print(session.sql("SELECT CURRENT_VERSION()").collect())

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def my_function_with_injected_session(session: Session):
  # This approach works only for function dispatch payloads
  # The session is injected automatically by the Snowflake ML Job API
  print(session.sql("SELECT CURRENT_VERSION()").collect())
Copy

Snowparkセッションを使用すると、 ML ジョブ内のSnowflakeテーブル、ステージ、およびその他のデータベースオブジェクトにアクセスできます。

ML ジョブから結果を返す

Snowflake ML ジョブは、実行結果をクライアント環境に返すことをサポートします。これにより、計算値、トレーニング済みモデル、またはジョブペイロードによって生成されたその他のアーティファクトを取得できます。

関数のディスパッチの場合は、装飾された関数から値を単純に返します。戻り値はシリアル化され、 result() メソッドによって利用可能になります。

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage")
def train_model(data_table: str):
  # Your ML code here
  model = XGBClassifier()
  model.fit(data_table)
  return model

job1 = train_model("my_training_data")
Copy

ファイルベースのジョブの場合は、特別な __return__ 変数を使用して戻り値を指定します。

# Example: /path/to/repo/my_script.py
def main():
    # Your ML code here
    model = XGBClassifier()
    model.fit(data_table)
    return model

if __name__ == "__main__":
    __return__ = main()
Copy
from snowflake.ml.jobs import submit_file

job2 = submit_file(
    "/path/to/repo/my_script.py",
    "MY_COMPUTE_POOL",
    stage_name="payload_stage",
    session=session,
)
Copy

MLJob.result()`API を使用してジョブの実行結果を取得できます。API は、ジョブが終了状態に達するまで呼び出しスレッドをブロックし、ペイロードの戻り値を返すか、実行に失敗した場合は例外を発生させます。ペイロードが戻り値を定義しない場合、成功した場合の結果は :code:`None になります。

# These will block until the respective job is done and return the trained model
model1 = job1.result()
model2 = job2.result()
Copy

管理 ML ジョブ

Snowflake ML ジョブを送信すると、 API は MLJob オブジェクトを作成します。これを使用して次のことを実行できます。

  • ステータス更新によるジョブの進捗管理

  • 詳細な実行ログによる問題のデバッグ

  • 実行結果の取得(もしあれば)

get_job() API を使って、 MLJob オブジェクトを ID で取得することができます。次のPythonコードは、 MLJob オブジェクトを取得する方法を示しています。

from snowflake.ml.jobs import MLJob, get_job, list_jobs, delete_job

# Get a list of the 10 most recent jobs as a Pandas DataFrame
jobs_df = list_jobs(limit=10)
print(jobs_df)  # Display list in table format

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())

# Clean up the job
delete_job(job)
Copy

依存関係の管理

The Snowflake ML Job API runs payloads inside the Container Runtime environment. The environment has the most commonly used Python packages for machine learning and data science. Most use cases should work "out of the box" without additional configuration. If you need custom dependencies, you can use pip_requirements to install them.

カスタム依存関係をインストールするには、外部アクセス統合を使用して外部ネットワークアクセスを有効にする必要があります。以下の SQL 例コマンドを使用してアクセスを提供することができます。

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

外部アクセス統合の詳細については、 外部アクセス統合の作成と使用 をご参照ください。

外部ネットワークアクセスを提供した後、 pip_requirementsexternal_access_integrations パラメーターを使用してカスタム依存関係を構成できます。コンテナーランタイム環境で利用できないパッケージや、特定のバージョンのパッケージを使用することができます。

次のPythonコードは、 remote デコレーターにカスタム依存関係を指定する方法を示します。

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

次のPythonコードは、 submit_file() メソッドにカスタム依存関係を指定する方法を示します。

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

プライベートパッケージフィード

Snowflake ML Jobsは、 JFrog ArtifactoryやSonatype Nexus Repositoryなどのプライベートフィードからのパッケージのロードもサポートしています。これらのフィードは、内部パッケージや独自パッケージの配布、依存関係のバージョン管理、セキュリティ/コンプライアンス確保のためによく使われます。

プライベートフィードからパッケージをインストールするには、以下を実行する必要があります。

  1. プライベートフィードの URL へのアクセスを許可するネットワークルールを作成します。

    1. 基本認証を使用するソースの場合は、ネットワークルールを作成するだけです。

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. プライベート接続(つまり、Private Link)を使用してソースへのアクセスを構成するには、 プライベート接続を使用したネットワークエグレス の手順に従ってください。

  2. ネットワークルールを使用して、外部アクセス統合を作成します。ジョブを送信するロールに、 EAI の使用許可を付与します。

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. ジョブ送信時に、プライベートフィード URL、外部アクセス統合、パッケージを指定します

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

プライベートフィード URL に認証トークンなどの機密情報が含まれている場合、Snowflakeシークレットを作成して URL を管理します。CREATE SECRET を使用してシークレットを作成します。spec_overrides 引数を使用して、ジョブ送信中にシークレットを構成します。

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

container.secrets の情報については、 containers.secrets フィールド をご参照ください。

Snowflake ML ジョブの使用方法例については、`MLジョブコードサンプル<https://github.com/Snowflake-Labs/sf-samples/tree/main/samples/ml/ml_jobs>`_ をご参照ください。

コストの考慮事項

Snowflake ML JobsはSnowpark Container Services上で実行され、使用量に応じて請求されます。コンピューティングコストに関する情報は、 Snowpark Container Servicesコスト をご覧ください。

ジョブペイロードは stage_name 引数で指定されたステージにアップロードされます。追加料金を請求されないようにするには、クリーンアップが必要です。ステージストレージに関するコストについては、 ストレージコストについて および ストレージコストの調査 をご参照ください。