Snowflake MLのジョブ

Snowflake ML Jobsを使用して、Snowflake ML コンテナーランタイム内で機械学習(ML)ワークフローを実行します。どの開発環境からでも実行できます。Snowflakeワークシートやノートブックでコードを実行する必要はありません。ジョブを使用してSnowflakeのインフラストラクチャを活用し、開発ワークフローの中でリソース集約型のタスクを実行します。Snowflake ML のローカルでのセットアップ情報については、 Snowflake ML をローカルで使用する を参照してください。

重要

Snowflake ML Jobsは、Snowpark ML Pythonパッケージ(snowflake-ml-python)バージョン1.8.2以降で公開プレビューとして利用可能です。

Snowflake ML Jobsでは以下のことが可能です。

  • GPU や高いメモリの CPU インスタンスなど、SnowflakeのCompute Poolで ML ワークロードを実行します。

  • VS CodeやJupyterノートブックなど、お好みの開発環境をご利用ください。

  • ランタイム環境内にカスタムのPythonパッケージをインストールして使用します。

  • Snowflakeの分散 APIs を使用して、データのロード、トレーニング、ハイパーパラメーターのチューニングを最適化します。

  • Apache Airflowなどのオーケストレーションツールと統合できます。

  • Snowflakeの APIs を通してジョブの監視と管理を行います。

これらの機能を使用して、次のことができます。

  • GPU アクセラレーションや大規模なコンピューティングリソースを必要とする大容量データセットに対して、リソース集約的なトレーニングを実行します。

  • パイプラインを介したプログラム実行により、 ML コードを開発環境から本番環境に移動することで、 ML ワークフローを本稼働へ移行します。

  • 既存の開発環境を維持しながら、Snowflakeのコンピューティングリソースを活用できます。

  • 最小限のコード変更で OSS ML ワークフローをリフトアンドシフトします。

  • 大容量のSnowflakeデータセットを直接扱うことで、データ移動を減らし、高価なデータ転送を回避します。

前提条件

重要

Snowflake ML Jobsは現在Python 3.10クライアントのみをサポートしています。その他のPythonバージョンのサポートが必要な場合は、Snowflakeアカウントチームまでご連絡ください。

  1. Python 3.10環境にSnowflake ML Pythonパッケージをインストールします。

    pip install snowflake-ml-python>=1.8.2
    
    Copy
  2. デフォルトのコンピューティングプールサイズは、 CPU_X64_Sインスタンスファミリーを使用します。ノード数の最小値は1、最大値は25です。次の SQL コマンドを使用して、カスタムコンピューティングプールを作成できます。

    CREATE COMPUTE POOL IF NOT EXISTS MY_COMPUTE_POOL
      MIN_NODES = <MIN_NODES>
      MAX_NODES = <MAX_NODES>
      INSTANCE_FAMILY = <INSTANCE_FAMILY>;
    
    Copy
  3. Snowflake ML JobsにはSnowparkセッションが必要です。次のコードで作成します。

    from snowflake.snowpark import Session
    from snowflake.ml.jobs import list_jobs
    
    ls = list_jobs() # This will fail! You must create a session first.
    
    # Requires valid ~/.snowflake/config.toml file
    session = Session.builder.getOrCreate()
    
    ls = list_jobs(session=session)
    ls = list_jobs() # Infers created session from context
    
    Copy

    セッションの作成に関する情報は、 セッションの作成 を参照してください。

Snowflake ML ジョブを実行する

Snowflake ML ジョブは、以下のいずれかの方法で実行できます。

  • コード内でリモートデコレーターを使用します。

  • Python API を使用して、ファイルまたはディレクトリ全体を送信します。

Python関数をSnowflake ML ジョブとして実行する

Function Dispatchを使用して、 @remote デコレーターを使用してSnowflakeのコンピューティングリソース上で個々のPython関数をリモートで実行します。

@remote デコレーターを使うと、次のようなことができます。

  • 関数とその依存関係をシリアライズします。

  • 指定したSnowflakeステージにアップロードします。

  • ML のContainer Runtime内で実行します。

次のPythonコード例では、 @remote デコレーターを使用してSnowflake ML ジョブを送信します。なお、Snowpark Session が必要です。 前提条件 をご覧ください。

from snowflake.ml.jobs import remote

@remote("MY_COMPUTE_POOL", stage_name="payload_stage", session=session)
def train_model(data_table: str):
  # Provide your ML code here, including imports and function calls
  ...

job = train_model("my_training_data")
Copy

@remote デコレート関数を呼び出すと、ジョブの実行管理と監視に使用できるSnowflake MLJob オブジェクトが返されます。詳細については、 Snowflake ML ジョブ管理 をご参照ください。

PythonファイルをSnowflake ML ジョブとして実行する

Snowflakeコンピューティングリソース上でPythonファイルまたはプロジェクトディレクトリを実行します。こんな時に便利です。

  • 複数のモジュールと依存関係を持つ複雑な ML プロジェクトを持っている場合。

  • ローカルの開発コードと本番コードの分離を維持したい場合。

  • コマンドライン引数を使用するスクリプトを実行する必要がある場合。

  • Snowflakeコンピュートで実行するように特別に設計されていない既存の ML プロジェクトを操作している場合。

Snowflake Job API には、主に2つのメソッドがあります。

  • submit_file(): 単一のPythonファイルを実行する場合

  • submit_directory(): 複数のファイルやリソースを含むプロジェクトディレクトリ全体を実行する場合

どちらのメソッドでも以下がサポートされます。

  • コマンドライン引数パッシング

  • 環境変数の構成

  • カスタム依存関係仕様

  • Snowflakeステージによるプロジェクト資産管理

File Dispatchは、既存の ML ワークフローを本稼働へ移行し、開発環境と実行環境の明確な分離を維持するために特に役立ちます。

次のPythonコードは、Snowflake ML ジョブにファイルを送信します。

from snowflake.ml.jobs import submit_file

# Run a single file
job1 = submit_file(
  "train.py",
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  args=["--data-table", "my_training_data"],
  session=session,
)
Copy

次のPythonコードは、Snowflake ML ジョブにディレクトリを送信します。

from snowflake.ml.jobs import submit_directory

# Run from a directory
job2 = submit_directory(
  "./ml_project/",
  "MY_COMPUTE_POOL",
  entrypoint="train.py",
  stage_name="payload_stage",
  session=session,
)
Copy

ファイルまたはディレクトリを送信すると、ジョブの実行を管理および監視するために使用できるSnowflake MLJob オブジェクトが返されます。詳細については、 Snowflake ML ジョブ管理 をご参照ください。

Snowflake ML ジョブ管理

Snowflake ML ジョブを送信すると、 API は MLJob オブジェクトを作成します。これを使用して次のことを実行できます。

  • ステータス更新によるジョブの進捗管理

  • 詳細な実行ログによる問題のデバッグ

  • 実行結果の取得(もしあれば)

get_job() API を使って、 MLJob オブジェクトを ID で取得することができます。次のPythonコードは、 MLJob オブジェクトを取得する方法を示しています。

from snowflake.ml.jobs import MLJob, get_job, list_jobs

# List all jobs
jobs = list_jobs()

# Retrieve an existing job based on ID
job = get_job("<job_id>")  # job is an MLJob instance

# Retrieve status and logs for the retrieved job
print(job.status)  # PENDING, RUNNING, FAILED, DONE
print(job.get_logs())
Copy

ジョブの result メソッドを呼び出し、実行の完了を待ち、実行結果を取得します。実行に失敗した場合、 result は例外を発生させます。

result = job.result()
Copy

依存関係の管理

Snowflake ML Job API は ML のContainer Runtime 環境内でペイロードを実行します。この環境には、機械学習やデータサイエンスのために最も一般的に使用されているPythonパッケージが用意されています。ほとんどのユースケースは、追加の構成なしで「そのまま」使用できるはずです。カスタム依存関係が必要な場合は、 pip_requirements を使ってインストールできます。

カスタム依存関係をインストールするには、外部アクセス統合を使用して外部ネットワークアクセスを有効にする必要があります。以下の SQL 例コマンドを使用してアクセスを提供することができます。

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION PYPI_EAI
  ALLOWED_NETWORK_RULES = (snowflake.external_access.pypi_rule)
  ENABLED = true;
Copy

外部アクセス統合の詳細については、 外部アクセス統合の作成と使用 をご参照ください。

外部ネットワークアクセスを提供した後、 pip_requirementsexternal_access_integrations パラメーターを使用してカスタム依存関係を構成できます。コンテナーランタイム環境で利用できないパッケージや、特定のバージョンのパッケージを使用することができます。

次のPythonコードは、 remote デコレーターにカスタム依存関係を指定する方法を示します。

@remote(
  "MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=["custom-package"],
  external_access_integrations=["PYPI_EAI"],
  session=session,
)
def my_function():
  # Your code here
Copy

次のPythonコードは、 submit_file() メソッドにカスタム依存関係を指定する方法を示します。

from snowflake.ml.jobs import submit_file

# Can include version specifier to specify version(s)
job = submit_file(
  "/path/to/repo/my_script.py",
  compute_pool,
  stage_name="payload_stage",
  pip_requirements=["custom-package==1.0.*"],
  external_access_integrations=["pypi_eai"],
  session=session,
)
Copy

プライベートパッケージフィード

Snowflake ML Jobsは、 JFrog ArtifactoryやSonatype Nexus Repositoryなどのプライベートフィードからのパッケージのロードもサポートしています。これらのフィードは、内部パッケージや独自パッケージの配布、依存関係のバージョン管理、セキュリティ/コンプライアンス確保のためによく使われます。

プライベートフィードからパッケージをインストールするには、以下を実行する必要があります。

  1. プライベートフィードの URL へのアクセスを許可するネットワークルールを作成します。

    1. 基本認証を使用するソースの場合は、ネットワークルールを作成するだけです。

      CREATE OR REPLACE NETWORK RULE private_feed_nr
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('<your-repo>.jfrog.io');
      
      Copy
    2. プライベート接続(つまり、Private Link)を使用してソースへのアクセスを構成するには、 プライベート接続を使用したネットワークエグレス の手順に従ってください。

  2. ネットワークルールを使用して、外部アクセス統合を作成します。ジョブを送信するロールに、 EAI の使用許可を付与します。

    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION private_feed_eai
    ALLOWED_NETWORK_RULES = (PRIVATE_FEED_NR)
    ENABLED = true;
    
    GRANT USAGE ON INTEGRATION private_feed_eai TO ROLE <role_name>;
    
    Copy
  3. ジョブ送信時に、プライベートフィード URL、外部アクセス統合、パッケージを指定します

    # Option 1: Specify private feed URL in pip_requirements
    job = submit_file(
      "/path/to/script.py",
      compute_pool="MY_COMPUTE_POOL",
      stage_name="payload_stage",
      pip_requirements=[
        "--index-url=https://your.private.feed.url",
        "internal-package==1.2.3"
      ],
      external_access_integrations=["PRIVATE_FEED_EAI"]
    )
    
    Copy
    # Option 2: Specify private feed URL by environment variable
    job = submit_directory(
      "/path/to/code/",
      compute_pool="MY_COMPUTE_POOL",
      entrypoint="script.py",
      stage_name="payload_stage",
      pip_requirements=["internal-package==1.2.3"],
      external_access_integrations=["PRIVATE_FEED_EAI"],
      env_vars={'PIP_INDEX_URL': 'https://your.private.feed.url'},
    )
    
    Copy

プライベートフィード URL に認証トークンなどの機密情報が含まれている場合は、Snowflake Secretを作成して URL を管理します。 CREATE SECRET を使ってシークレットを作成します。 spec_overrides 引数を使用して、ジョブ送信時にシークレットを構成します。

# Create secret for private feed URL with embedded auth token
feed_url = "<your-repo>.jfrog.io/artifactory/api/pypi/test-pypi/simple"
user = "<auth_user>"
token = "<auth_token>"
session.sql(f"""
CREATE SECRET IF NOT EXISTS PRIVATE_FEED_URL_SECRET
 TYPE = GENERIC_STRING
 SECRET_STRING = 'https://{auth_user}:{auth_token}@{feed_url}'
""").collect()

# Prepare service spec override for mounting secret into job execution
spec_overrides = {
 "spec": {
  "containers": [
    {
     "name": "main",  # Primary container name is always "main"
     "secrets": [
      {
        "snowflakeSecret": "PRIVATE_FEED_URL_SECRET",
        "envVarName": "PIP_INDEX_URL",
        "secretKeyRef": "secret_string"
      },
     ],
    }
  ]
 }
}

# Load private feed URL from secret (e.g. if URL includes auth token)
job = submit_file(
  "/path/to/script.py",
  compute_pool="MY_COMPUTE_POOL",
  stage_name="payload_stage",
  pip_requirements=[
    "internal-package==1.2.3"
  ],
  external_access_integrations=["PRIVATE_FEED_EAI"],
  spec_overrides=spec_overrides,
)
Copy

container.secrets の情報については、 containers.secrets フィールド をご参照ください。

コストの考慮事項

Snowflake ML JobsはSnowpark Container Services上で実行され、使用量に応じて請求されます。コンピューティングコストに関する情報は、 Snowpark Container Servicesコスト をご覧ください。

ジョブペイロードは stage_name 引数で指定されたステージにアップロードされます。追加料金を請求されないようにするには、クリーンアップが必要です。ステージストレージに関するコストについては、 ストレージコストについて および ストレージコストの調査 をご参照ください。