Pythonを使用したSnowpark Container Servicesの管理

Pythonを使用して、コンテナー化されたアプリケーションの展開、管理、スケールが可能なフルマネージドコンテナーサービスであるSnowpark Container Servicesを管理できます。Snowpark Container Servicesの概要については、 Snowpark Container Servicesについて をご参照ください。

Snowflake Python API を使用すると、コンピューティングプール、イメージリポジトリ、サービスを管理することができます。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python API を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python API を使用してSnowflakeに接続する をご参照ください。

コンピューティングプールの管理

コンピューティングプールは、SnowflakeがSnowpark Container Servicesのジョブやサービスを実行する仮想マシン(VM)ノードの集合体です。

Snowflake Python API は、2つの別々のタイプでコンピューティングプールを表します。

  • ComputePool: コンピューティングプールのプロパティ(ウェアハウス、最大ノード、最小ノード、自動再開および自動中断設定など)を公開します。

  • ComputePoolResource: 対応する ComputePool オブジェクトのフェッチ、プールの中断、再開、停止など、コンピューティングプールに対してアクションを実行するためのメソッドを公開します。

コンピューティングプールに関する詳細については、 Snowpark Container Services: コンピューティングプールの操作 をご参照ください。

コンピューティングプールの作成

作成するコンピューティングプールを表す ComputePool オブジェクトを渡して ComputePoolCollection.create メソッドを呼び出すと、コンピューティングプールを作成できます。

コンピューティングプールを作成するには、まず、以下のようなプールプロパティを指定する ComputePool オブジェクトを作成します。

  • コンピューティングプール名

  • プールに含まれるノードの最大数と最小数

  • プール内のノードにプロビジョニングするマシンのタイプを識別するインスタンスファミリーの名前

  • サービスまたはジョブがプールに送信されたときに、プールを自動的に再開するかどうか。

次の例のコードでは、 my_compute_pool というプールを表す ComputePool オブジェクトを作成します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = ComputePool("my_compute_pool", min_node=1, max_node=2, instance_family="CPU_X64_XS", auto_resume=False)
root.compute_pools.create(compute_pool)
Copy

次に、コードは ComputePool オブジェクトを ComputePoolCollection.create メソッドに渡して、コンピューティングプールを作成します。

コンピューティングプール詳細の取得

ComputePool オブジェクトを返す ComputePoolResource.fetch メソッドを呼び出すことで、コンピューティングプールに関する情報を取得できます。

次の例のコードは、 my_compute_pool というプールに関する情報を取得します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"].fetch()
Copy

コンピューティングプールの作成または更新

既存のコンピューティングプールを表す ComputePool オブジェクトのプロパティを設定し、 ComputePoolResource オブジェクトを返す create_or_update メソッドを使用して、Snowflakeに更新されたオブジェクトを渡すと、既存のコンピューティングプールの特性を更新できます。

プールを作成するときに、新しいプールを記述する ComputePool オブジェクトを渡すこともできます。

次の例のコードは、 my_compute_pool コンピューティングプールの最大許容ノードを設定し、Snowflake 上のプールを更新します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"].fetch()
compute_pool.max_node = 3
compute_pool_res = root.compute_pools.create_or_update(compute_pool)
Copy

コンピューティングプールの一覧表示

PagedIter 反復子を返す iter メソッドを使用して、コンピューティングプールをリストすることができます。

次の例のコードは、名前が abc で始まるコンピューティングプールをリストします。

from snowflake.core import Root

compute_pools = root.compute_pools.iter(like="abc%")
for compute_pool in compute_pools:
  print(compute_pool.name)
Copy

コンピューティングプール操作の実行

ComputePool.fetch メソッドを使用して取得できる ComputePoolResource オブジェクトを使用して、プールの中断、再開、停止など一般的なコンピューティングプールの操作を実行できます。

次の例のコードは、 my_compute_pool コンピューティングプールを中断、再開、停止します。

from snowflake.core import Root
from snowflake.core.compute_pool import ComputePool

compute_pool = root.compute_pools["my_compute_pool"]
compute_pool_res = compute_pool.fetch()
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
Copy

コードは、 Root.compute_pools メソッドを使用して、コンピューティングプールを表す ComputePool オブジェクトを作成します。 ComputePool オブジェクトから、 ComputePoolResource オブジェクトをフェッチし、コンピューティングプール操作を実行します。

イメージリポジトリの管理

コンテナサービス上で実行するアプリケーションのイメージを格納するイメージリポジトリを管理できます。

イメージリポジトリはスキーマレベルのオブジェクトです。リポジトリを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python API は、2つの別々のタイプでイメージリポジトリを表します。

  • ImageRepository: イメージリポジトリのデータベース名やスキーマ名、リポジトリ URL、所有者などのプロパティを公開します。

  • ImageRepositoryResource: 対応する ImageRepository オブジェクトをフェッチし、イメージリポジトリリソースを削除するためのメソッドを公開します。

イメージリポジトリに関する詳細については、 Snowpark Container Services: イメージレジストリおよびリポジトリの操作 をご参照ください。

イメージリポジトリの作成

イメージリポジトリを作成するには、まずリポジトリ名を指定する ImageRepository オブジェクトを作成します。

次の例のコードは、 my_repo という名前のリポジトリを表す ImageRepository オブジェクトを作成します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
Copy

次に、コードは ImageRepository オブジェクトを ImageRepositoryCollection.create メソッドに渡してイメージリポジトリを作成することで、 my_db データベースと my_schema スキーマにイメージリポジトリを作成します。

イメージリポジトリ詳細の取得

ImageRepository オブジェクトを返す ImageRepositoryResource.fetch メソッドを呼び出すことで、イメージリポジトリに関する情報を取得できます。

次の例のコードは、 my_repo イメージリポジトリを表す ImageRepository オブジェクトを取得して、リポジトリの所有者の名前を印刷します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
Copy

イメージリポジトリの一覧表示

ImageRepository オブジェクトの PagedIter 反復子を返す iter メソッドを使用して、指定したスキーマのイメージリポジトリをリストすることができます。

次の例のコードは、 my_db データベースと my_schema スキーマのリポジトリ名をリストします。

from snowflake.core import Root

repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
  print(repo_obj.name)
Copy

イメージリポジトリの削除

ImageRepositoryResource.delete メソッドを使用してイメージリポジトリを削除できます。

次の例のコードは、 my_repo リポジトリを削除します。

from snowflake.core import Root
from snowflake.core.image_repository import ImageRepository

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.delete()
Copy

サービスの管理

停止させるまでアプリケーションコンテナを実行するサービスを管理できます。サービスコンテナが停止した場合、Snowflakeは自動的にサービスを再起動します。こうすることで、サービスは実質的に中断することなく実行されます。

サービスはスキーマレベルのオブジェクトです。サービスを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python API は、2つの別々のタイプでサービスを表します。

  • Service: サービスの仕様、最小インスタンス数、最大インスタンス数、データベース名、スキーマ名などのプロパティを公開します。

  • ServiceResource: 対応する Service オブジェクトのフェッチ、サービスの中断と再開、そのステータスの取得に使用できるメソッドを公開します。

サービスに関する詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。

サービスの作成

サービスを作成するには、 services.create メソッドを実行し、作成したいサービスを表す Service オブジェクトを渡します。

ステージにアップロードされたサービス仕様 .yaml ファイルからサービスを作成します。サービス仕様の作成に関する詳細については、 サービス仕様リファレンス をご参照ください。

仕様のアップロード

まだステージにアップロードされていない仕様からサービスを作成する場合は、Snowpark FileOperation オブジェクトを使用して仕様をアップロードできます。

次の例のコードは、 FileOperation.put メソッドを使用して仕様をファイルとしてアップロードします。

session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
Copy

次の例のコードは、 FileOperation.put_stream メソッドを使用して仕様を文字列としてアップロードします。

service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
Copy

サービスの作成

ステージングされた仕様からサービスを作成するには、まず以下のようなサービスプロパティを指定する Service オブジェクトを作成します。

  • サービス名

  • Snowflakeが作成できるサービスインスタンスの最大数と最小数

  • サービスが追加されるべきコンピューティングプール

  • ステージの場所と仕様名

次の例のコードは、 @my_stage/my_service_spec.yaml の仕様から my_service というサービスを表す Service オブジェクトを作成します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service = Service("my_service", min_instances=1, max_instances=2,compute_pool="my_compute_pool", from_stage="@my_stage", spec="my_service_spec.yaml")
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

次に、コードは Service オブジェクトを ServiceCollection.create メソッドに渡してサービスを作成することで、 my_db データベースと my_schema スキーマにサービスを作成します。

サービス詳細の取得

Service オブジェクトを返す ServiceResource.fetch メソッドを呼び出すことで、Snowflakeサービスに関する情報を取得できます。

次の例のコードは、 my_service というサービスに関する情報を取得します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
Copy

サービスの一覧表示

Service オブジェクトの PagedIter 反復子を返す iter メソッドを使用して、指定したスキーマのサービスをリストすることができます。

次の例のコードは、名前が abc で始まるサービスをリストします。

from snowflake.core import Root

services = root.databases["my_db"].schemas["my_schema"].services.iter(like="abc%")
for service_obj in services:
  print(service_obj.name)
Copy

サービス操作の実行

ServiceResource オブジェクトを使用して、サービスの中断、再開、そのステータスの取得など一般的なサービスの操作を実行できます。

次の例のコードは、 my_service サービスを中断および再開し、サービスのステータスも所得します。

from snowflake.core import Root
from snowflake.core.service import Service

my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]

my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)
Copy