Pythonを使用したSnowpark Container Servicesの管理

Pythonを使用して、コンテナ化されたアプリケーションの展開、管理、拡張が可能なフルマネージドコンテナサービスであるSnowpark Container Servicesを管理できます。Snowpark Container Servicesの概要については、 Snowpark Container Servicesについて をご参照ください。

Snowflake Python API を使用すると、コンピュートプール、イメージリポジトリ、サービスを管理することができます。

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python API を使用することを想定しています。詳細については、 Snowflake Python API によるSnowflakeへの接続 をご参照ください。

次の例のコードは、構成ファイルで定義された接続パラメーターを使用して、Snowflakeへの接続を作成します。出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

コンピュートプールの管理

コンピュートプールは、SnowflakeがSnowpark Container Servicesのジョブやサービスを実行する仮想マシン(VM)ノードの集合体です。

Snowflake Python API は、2つの別々のタイプでコンピュートプールを表します。

  • ComputePool: コンピュートプールのプロパティ(ウェアハウス、最大ノード、最小ノード、自動再開および自動中断設定など)を公開します。

  • ComputePoolResource: 対応する ComputePool オブジェクトのフェッチ、プールの中断、再開、停止など、コンピュートプールに対してアクションを実行するためのメソッドを公開します。

コンピュートプールの詳細については、 Snowpark Container Services: コンピューティングプールの操作 をご参照ください。

コンピューティングプールの作成

作成したいコンピュートプールを表す ComputePool オブジェクトを渡して ComputePoolCollection.create メソッドを呼び出すことで、コンピュートプールを作成できます。

ステージングされた仕様からコンピュートプールを作成するには、まず、以下のようなプールプロパティを指定する ComputePool オブジェクトを作成します。

  • コンピュートプール名

  • プールを構成するノードの最大数と最小数

  • プール内のノードにプロビジョニングするマシンのタイプを識別するインスタンスファミリーの名前

  • サービスまたはジョブがプールに送信されたときに、プールを自動的に再開するかどうか。

次の例のコードでは、 my_compute_pool というプールを表す ComputePool オブジェクトを作成します。次に、コードは ComputePool オブジェクトを ComputePoolCollection.create メソッドに渡すことで、コンピュートプールを作成します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = ComputePool("my_compute_pool", min_node=1, max_node=2, instance_family="STANDARD_1", auto_resume=False)
root.compute_pools.create(compute_pool)
Copy

コンピュートプール詳細の取得

ComputePool オブジェクトを返す ComputePoolResource.fetch メソッドを呼び出すことで、コンピュートプールに関する情報を取得できます。

次の例のコードは、 my_compute_pool というプールに関する情報を取得します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"].fetch()
Copy

コンピュートプールの作成または更新

既存のコンピュートプールを表す ComputePool オブジェクトのプロパティを設定し、更新されたオブジェクトを ComputePoolResource オブジェクトを返す create_or_update メソッドで Snowflake に渡すことで、既存のコンピュートプールの特性を更新できます。

プールを作成するときに、新しいプールを記述する ComputePool オブジェクトを渡すこともできます。

次の例のコードは、 my_compute_pool コンピュートプールの最大許容ノードを設定し、Snowflake 上のプールを更新します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"].fetch()
compute_pool.max_node = 3
compute_pool_res = root.compute_pools.create_or_update(compute_pool)
Copy

コンピュートプールの一覧表示

iter メソッドを使用してコンピュートプールを一覧表示することができます。メソッドは PagedIter 反復子を返します。

次の例のコードは、名前が「abc」で始まるコンピュートプールを一覧表示し、それぞれの名前を印刷します。

from snowflake.core import Root

compute_pools = root.compute_pools.iter(like="abc%")
for compute_pool in compute_pools:
  print(compute_pool.name)
Copy

コンピュートプール操作の実行

ComputePoolResource オブジェクトを使用して、プールの中断、再開、停止など一般的なコンピュートプールの操作を実行できます。 ComputePool.fetch メソッドを使用して ComputePoolResource オブジェクトを取得できます。

次の例のコードは、 my_compute_pool コンピュートプールを中断、再開、停止します。これは、 Root.compute_pools メソッドを使用して、コンピュートプールを表す ComputePool オブジェクトを作成します。 ComputePool オブジェクトから、 ComputePoolResource オブジェクトをフェッチし、コンピュートプール操作を実行します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"]
compute_pool_res = compute_pool.fetch()
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
Copy

イメージリポジトリの管理

コンテナサービス上で実行するアプリケーションのイメージを格納するイメージリポジトリを管理できます。

イメージリポジトリはスキーマレベルのオブジェクトです。リポジトリを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python API は、2つの別々のタイプでイメージリポジトリを表します。

  • ImageRepository: イメージリポジトリのデータベース名やスキーマ名、リポジトリ URL、所有者などのプロパティを公開します。

  • ImageRepositoryResource: 対応する ImageRepository オブジェクトをフェッチし、イメージリポジトリリソースを削除するためのメソッドを公開します。

イメージリポジトリについての詳細は、 Snowpark Container Services: イメージレジストリおよびリポジトリの操作 をご参照ください。

イメージリポジトリの作成

ステージングされた仕様からイメージリポジトリを作成するには、まずリポジトリ名を指定する ImageRepository オブジェクトを作成します。

次の例のコードでは、 my_repo というリポジトリを表す ImageRepository オブジェクトを作成します。次に、コードは ImageRepository オブジェクトを ImageRepositoryCollection.create メソッドに渡してイメージリポジトリを作成することで、 my_db データベースと my_schema スキーマにイメージリポジトリを作成します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
Copy

イメージリポジトリ詳細の取得

ImageRepository オブジェクトを返す ImageRepositoryResource.fetch メソッドを呼び出すことで、イメージリポジトリに関する情報を取得できます。

次の例のコードは、 my_repo イメージリポジトリを表す ImageRepository オブジェクトを取得し、リポジトリの所有者の名前を印刷します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
Copy

イメージリポジトリの一覧表示

iter メソッドを使用して指定したスキーマのイメージリポジトリを一覧表示することができます。このメソッドは ImageRepository オブジェクトの PagedIter 反復子を返します。

次の例のコードは、 my_db データベースと my_schema スキーマのリポジトリを一覧表示し、各リポジトリの名前を印刷します。

from snowflake.core import Root

repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
  print(repo_obj.name)
Copy

イメージリポジトリの削除

ImageRepositoryResource.delete メソッドを使用してイメージリポジトリを削除できます。

次の例のコードは、 my_repo リポジトリを削除します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.delete()
Copy

サービスの管理

停止させるまでアプリケーションコンテナを実行するサービスを管理できます。サービスコンテナが停止した場合、Snowflakeは自動的にサービスを再起動します。こうすることで、サービスは実質的に中断することなく実行されます。

サービスはスキーマレベルのオブジェクトです。サービスを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python API は、2つの別々のタイプでサービスを表します。

  • Service: サービスの仕様、最小インスタンス数、最大インスタンス数、データベース名、スキーマ名などのプロパティを公開します。

  • ServiceResource: 対応する Service オブジェクトのフェッチ、サービスの中断と再開、そのステータスの取得に使用できるメソッドを公開します。

サービスの詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。

サービスの作成

サービスを作成するには、 services.create メソッドを実行し、作成したいサービスを表す Service オブジェクトを渡します。

ステージにアップロードされたサービス仕様 .yaml ファイルからサービスを作成します。サービス仕様の作成の詳細については、 サービス仕様リファレンス をご参照ください。

仕様のアップロード

まだステージにアップロードされていない仕様からサービスを作成する場合は、Snowpark FileOperation オブジェクトを使用して仕様をアップロードできます。

次の例のコードは、 FileOperation.put メソッドを使用して仕様をファイルとしてアップロードします。

session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
Copy

次の例のコードは、 FileOperation.put_stream メソッドを使用して仕様を文字列としてアップロードします。

service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
Copy

サービスの作成

ステージングされた仕様からサービスを作成するには、まず以下のようなサービスプロパティを指定する Service オブジェクトを作成します。

  • サービス名

  • Snowflakeが作成できるサービスインスタンスの最大数と最小数

  • サービスが追加されるべきコンピュートプール

  • ステージの場所と仕様名

次の例のコードは、 @my_stage/my_service_spec.yaml の仕様から my_service というサービスを表す Service オブジェクトを作成します。次に、コードは Service オブジェクトを ServiceCollection.create メソッドに渡してサービスを作成することで、 my_db データベースと my_schema スキーマにサービスを作成します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service = Service("my_service", min_instances=1, max_instances=2,compute_pool="my_compute_pool", from_stage="@my_stage", spec="my_service_spec.yaml")
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

サービス詳細の取得

Service オブジェクトを返す ServiceResource.fetch メソッドを呼び出すことで、Snowflakeサービスに関する情報を取得できます。

次の例のコードは、 my_service というサービスに関する情報を取得します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
Copy

サービスの一覧表示

iter メソッドを使用して指定したスキーマのサービスを一覧表示することができます。このメソッドは Service オブジェクトの PagedIter 反復子を返します。

次の例のコードは、名前が「abc」で始まるサービスを一覧表示し、それぞれの名前を印刷します。

from snowflake.core import Root

services = root.databases["my_db"].schemas["my_schema"].services.iter(like="abc%")
for service_obj in services:
  print(service_obj.name)
Copy

サービス操作の実行

ServiceResource オブジェクトを使用して、サービスの中断、再開、そのステータスの取得など一般的なサービスの操作を実行できます。

次の例のコードは、 my_service サービスを中断および再開します。また、サービスのステータスも取得します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]

my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)
Copy