PythonによるSnowpark Container Services(サービス関数を含む)の管理

Pythonを使用して、コンテナー化されたアプリケーションの展開、管理、スケールが可能なフルマネージドコンテナーサービスであるSnowpark Container Servicesを管理できます。Snowpark Container Servicesの概要については、 Snowpark Container Servicesについて をご参照ください。

Snowflake Python APIs を使用すると、コンピューティングプール、イメージリポジトリ、サービスを管理することができます。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

コンピュートプールの管理

コンピュートプールは、SnowflakeがSnowpark Container Servicesのジョブやサービスを実行する仮想マシン(VM)ノードの集合体です。

Snowflake Python APIs は、2つの別々のタイプでコンピューティングプールを表します。

  • ComputePool: コンピューティングプールのプロパティ(ウェアハウス、最大ノード、最小ノード、自動再開および自動中断設定など)を公開します。

  • ComputePoolResource: 対応する ComputePool オブジェクトのフェッチ、プールの中断、再開、停止など、コンピューティングプールに対してアクションを実行するためのメソッドを公開します。

コンピューティングプールに関する詳細については、 Snowpark Container Services: コンピューティングプールの操作 をご参照ください。

コンピューティングプールの作成

作成するコンピューティングプールを表す ComputePool オブジェクトを渡して ComputePoolCollection.create メソッドを呼び出すと、コンピューティングプールを作成できます。

コンピューティングプールを作成するには、まず、以下のようなプールプロパティを指定する ComputePool オブジェクトを作成します。

  • コンピュートプール名

  • プールに含まれるノードの最大数と最小数

  • プール内のノードにプロビジョニングするマシンのタイプを識別するインスタンスファミリーの名前

  • サービスまたはジョブがプールに送信されたときに、プールを自動的に再開するかどうか。

次の例のコードでは、 my_compute_pool というプールを表す ComputePool オブジェクトを作成します。

from snowflake.core.compute_pool import ComputePool

compute_pool = ComputePool(name="my_compute_pool", min_nodes=1, max_nodes=2, instance_family="CPU_X64_XS", auto_resume=False)
root.compute_pools.create(compute_pool)
Copy

次に、コードは ComputePool オブジェクトを ComputePoolCollection.create メソッドに渡して、コンピューティングプールを作成します。

コンピュートプール詳細の取得

ComputePool オブジェクトを返す ComputePoolResource.fetch メソッドを呼び出すことで、コンピュートプールに関する情報を取得できます。

次の例のコードは、 my_compute_pool というプールに関する情報を取得します。

compute_pool = root.compute_pools["my_compute_pool"].fetch()
print(compute_pool.to_dict())
Copy

コンピュートプールの一覧表示

PagedIter 反復子を返す iter メソッドを使用して、コンピューティングプールをリストすることができます。

次の例のコードは、名前が my で始まるコンピューティングプールを一覧表示します。

compute_pools = root.compute_pools.iter(like="my%")
for compute_pool in compute_pools:
  print(compute_pool.name)
Copy

コンピュートプール操作の実行

ComputePool.fetch メソッドを使用して取得できる ComputePoolResource オブジェクトを使用して、プールの中断、再開、停止など一般的なコンピューティングプールの操作を実行できます。

次の例のコードは、 my_compute_pool コンピューティングプールを中断、再開、停止します。

compute_pool_res = root.compute_pools["my_compute_pool"]
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
Copy

コードは、 Root.compute_pools メソッドを使用して、コンピューティングプールを表す ComputePool オブジェクトを作成します。 ComputePool オブジェクトから、 ComputePoolResource オブジェクトをフェッチし、コンピューティングプール操作を実行します。

イメージリポジトリの管理

コンテナサービス上で実行するアプリケーションのイメージを格納するイメージリポジトリを管理できます。

イメージリポジトリはスキーマレベルのオブジェクトです。リポジトリを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python APIs は、2つの別々のタイプでイメージリポジトリを表します。

  • ImageRepository: イメージリポジトリのデータベース名やスキーマ名、リポジトリ URL、所有者などのプロパティを公開します。

  • ImageRepositoryResource: 対応する ImageRepository オブジェクトを取得し、イメージリポジトリリソースを削除するためのメソッドを公開します。

イメージリポジトリに関する詳細については、 Snowpark Container Services: イメージレジストリおよびリポジトリの操作 をご参照ください。

イメージリポジトリの作成

イメージリポジトリを作成するには、まずリポジトリ名を指定する ImageRepository オブジェクトを作成します。

次の例のコードは、 my_repo という名前のリポジトリを表す ImageRepository オブジェクトを作成します。

from snowflake.core.image_repository import ImageRepository

my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
Copy

次に、コードは ImageRepository オブジェクトを ImageRepositoryCollection.create メソッドに渡してイメージリポジトリを作成することで、 my_db データベースと my_schema スキーマにイメージリポジトリを作成します。

イメージリポジトリ詳細の取得

ImageRepository オブジェクトを返す ImageRepositoryResource.fetch メソッドを呼び出すことで、イメージリポジトリに関する情報を取得できます。

次の例のコードは、 my_repo イメージリポジトリを表す ImageRepository オブジェクトを取得して、リポジトリの所有者の名前を印刷します。

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
Copy

イメージリポジトリの一覧表示

ImageRepository オブジェクトの PagedIter 反復子を返す iter メソッドを使用して、指定したスキーマのイメージリポジトリをリストすることができます。

次の例のコードは、 my_db データベースと my_schema スキーマのリポジトリ名をリストします。

repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
  print(repo_obj.name)
Copy

イメージリポジトリのドロップ

ImageRepositoryResource.drop メソッドを使用してイメージリポジトリをドロップできます。

次の例のコードは、 my_repo リポジトリをドロップします。

my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.drop()
Copy

サービスとサービス関数の管理

停止させるまでアプリケーションコンテナを実行するサービスを管理できます。サービスコンテナが停止した場合、Snowflakeは自動的にサービスを再起動します。こうすることで、サービスは実質的に中断することなく実行されます。

サービスはスキーマレベルのオブジェクトです。サービスを作成または参照するときは、そのスキーマのコンテキスト内で行います。

Snowflake Python APIs は、2つの別々のタイプでサービスを表します。

  • Service: サービスの仕様、最小インスタンス数、最大インスタンス数、データベース名、スキーマ名などのプロパティを公開します。

  • ServiceResource: 対応する Service オブジェクトのフェッチ、サービスの中断と再開、そのステータスの取得に使用できるメソッドを公開します。

サービスに関する詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。

サービスの作成

サービスを作成するには、 services.create メソッドを実行し、作成したいサービスを表す Service オブジェクトを渡します。

ステージにアップロードされたサービス仕様 .yaml ファイルからサービスを作成します。サービス仕様の作成に関する詳細については、 サービス仕様リファレンス をご参照ください。

仕様のアップロード

まだステージにアップロードされていない仕様からサービスを作成する場合は、Snowpark FileOperation オブジェクトを使用して仕様をアップロードできます。

次の例のコードは、 FileOperation.put メソッドを使用して仕様をファイルとしてアップロードします。

session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
Copy

次の例のコードは、 FileOperation.put_stream メソッドを使用して仕様を文字列としてアップロードします。

service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
Copy

サービスの作成

ステージングされた仕様からサービスを作成するには、まず以下のようなサービスプロパティを指定する Service オブジェクトを作成します。

  • サービス名

  • Snowflakeが作成できるサービスインスタンスの最大数と最小数

  • サービスが追加されるべきコンピュートプール

  • ステージの場所と仕様名

次の例のコードは、 @my_stage/my_service_spec.yaml の仕様から my_service というサービスを表す Service オブジェクトを作成します。

from snowflake.core.service import Service, ServiceSpec

my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec("@my_stage/my_service_spec.yaml"))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

次に、コードは Service オブジェクトを ServiceCollection.create メソッドに渡してサービスを作成することで、 my_db データベースと my_schema スキーマにサービスを作成します。

次の例に示すように、インラインテキストとして指定した仕様からサービスを作成することもできます。 ServiceSpec 関数は単一の文字列引数 spec を取ります。文字列が @ で始まる場合、この関数はそれをステージファイルのパスとして解釈して検証します。そうでない場合は、文字列はインラインテキストとして渡されます。

from textwrap import dedent
from snowflake.core.service import Service, ServiceSpec

spec_text = dedent(f"""\
    spec:
      containers:
      - name: hello-world
        image: repo/hello-world:latest
      endpoints:
      - name: hello-world-endpoint
        port: 8080
        public: true
    """)

my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec(spec_text))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
Copy

サービス関数の作成

サービスが稼働したら、サービスエンドポイントと通信するサービス関数を作成できます。サービス関数は、 Snowpark Container Services のサービスに関連付けるユーザー定義関数(UDF)です。詳細については、 サービス関数: SQL クエリからのサービスの使用 をご参照ください。

次の例のコードは、 my-udf という名前の UDF を作成し、以前に定義した hello-world サービスと hello-world-endpoint エンドポイントを指定します。

from snowflake.core import CreateMode
from snowflake.core.function import FunctionArgument, ServiceFunction

root.databases["my_db"].schemas["my_schema"].functions.create(
  ServiceFunction(
    name="my-udf",
    arguments=[
        FunctionArgument(name="input", datatype="TEXT")
    ],
    returns="TEXT",
    service="hello-world",
    endpoint="'hello-world-endpoint'",
    path="/hello-world-path",
    max_batch_rows=5,
  ),
  mode = CreateMode.or_replace
)
Copy

サービス関数の呼び出し

サービス関数が作成されたら、その関数を呼び出してテストすることができます。

次の例のコードは、以前に作成した my-udf サービス関数を呼び出します。

result = root.databases["my_db"].schemas["my_schema"].functions["my-udf(TEXT)"].execute_function(["test"])
print(result)
Copy

サービス詳細の取得

Service オブジェクトを返す ServiceResource.fetch メソッドを呼び出すことで、Snowflakeサービスに関する情報を取得できます。

次の例のコードは、 my_service というサービスに関する情報を取得します。

my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
Copy

サービスの一覧表示

Service オブジェクトの PagedIter 反復子を返す iter メソッドを使用して、指定したスキーマのサービスをリストすることができます。

次の例のコードは、名前が my で始まるサービスを一覧表示します。

services = root.databases["my_db"].schemas["my_schema"].services.iter(like="my%")
for service_obj in services:
  print(service_obj.name)
Copy

サービス操作の実行

ServiceResource オブジェクトを使用して、サービスの中断、再開、そのステータスの取得など一般的なサービスの操作を実行できます。

次の例のコードは、 my_service サービスを中断および再開し、サービスのステータスも所得します。

my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]

my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)
Copy