PythonによるSnowpark Container Services(サービス関数を含む)の管理¶
Pythonを使用して、コンテナー化されたアプリケーションの展開、管理、スケールが可能なフルマネージドコンテナーサービスであるSnowpark Container Servicesを管理できます。Snowpark Container Servicesの概要については、 Snowpark Container Servicesについて をご参照ください。
Snowflake Python APIs を使用すると、コンピューティングプール、イメージリポジトリ、サービスを管理することができます。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
コンピュートプールの管理¶
コンピュートプールは、SnowflakeがSnowpark Container Servicesのジョブやサービスを実行する仮想マシン(VM)ノードの集合体です。
Snowflake Python APIs は、2つの別々のタイプでコンピューティングプールを表します。
ComputePool
: コンピューティングプールのプロパティ(ウェアハウス、最大ノード、最小ノード、自動再開および自動中断設定など)を公開します。ComputePoolResource
: 対応するComputePool
オブジェクトのフェッチ、プールの中断、再開、停止など、コンピューティングプールに対してアクションを実行するためのメソッドを公開します。
コンピューティングプールに関する詳細については、 Snowpark Container Services: コンピューティングプールの操作 をご参照ください。
コンピューティングプールの作成¶
作成するコンピューティングプールを表す ComputePool
オブジェクトを渡して ComputePoolCollection.create
メソッドを呼び出すと、コンピューティングプールを作成できます。
コンピューティングプールを作成するには、まず、以下のようなプールプロパティを指定する ComputePool
オブジェクトを作成します。
コンピュートプール名
プールに含まれるノードの最大数と最小数
プール内のノードにプロビジョニングするマシンのタイプを識別するインスタンスファミリーの名前
サービスまたはジョブがプールに送信されたときに、プールを自動的に再開するかどうか。
次の例のコードでは、 my_compute_pool
というプールを表す ComputePool
オブジェクトを作成します。
from snowflake.core.compute_pool import ComputePool
compute_pool = ComputePool(name="my_compute_pool", min_nodes=1, max_nodes=2, instance_family="CPU_X64_XS", auto_resume=False)
root.compute_pools.create(compute_pool)
次に、コードは ComputePool
オブジェクトを ComputePoolCollection.create
メソッドに渡して、コンピューティングプールを作成します。
コンピュートプール詳細の取得¶
ComputePool
オブジェクトを返す ComputePoolResource.fetch
メソッドを呼び出すことで、コンピュートプールに関する情報を取得できます。
次の例のコードは、 my_compute_pool
というプールに関する情報を取得します。
compute_pool = root.compute_pools["my_compute_pool"].fetch()
print(compute_pool.to_dict())
コンピュートプールの一覧表示¶
PagedIter
反復子を返す iter
メソッドを使用して、コンピューティングプールをリストすることができます。
次の例のコードは、名前が my
で始まるコンピューティングプールを一覧表示します。
compute_pools = root.compute_pools.iter(like="my%")
for compute_pool in compute_pools:
print(compute_pool.name)
コンピュートプール操作の実行¶
ComputePool.fetch
メソッドを使用して取得できる ComputePoolResource
オブジェクトを使用して、プールの中断、再開、停止など一般的なコンピューティングプールの操作を実行できます。
次の例のコードは、 my_compute_pool
コンピューティングプールを中断、再開、停止します。
compute_pool_res = root.compute_pools["my_compute_pool"]
compute_pool_res.suspend()
compute_pool_res.resume()
compute_pool_res.stop_all_services()
コードは、 Root.compute_pools
メソッドを使用して、コンピューティングプールを表す ComputePool
オブジェクトを作成します。 ComputePool
オブジェクトから、 ComputePoolResource
オブジェクトをフェッチし、コンピューティングプール操作を実行します。
イメージリポジトリの管理¶
コンテナサービス上で実行するアプリケーションのイメージを格納するイメージリポジトリを管理できます。
イメージリポジトリはスキーマレベルのオブジェクトです。リポジトリを作成または参照するときは、そのスキーマのコンテキスト内で行います。
Snowflake Python APIs は、2つの別々のタイプでイメージリポジトリを表します。
ImageRepository
: イメージリポジトリのデータベース名やスキーマ名、リポジトリ URL、所有者などのプロパティを公開します。ImageRepositoryResource
: 対応するImageRepository
オブジェクトを取得し、イメージリポジトリリソースを削除するためのメソッドを公開します。
イメージリポジトリに関する詳細については、 Snowpark Container Services: イメージレジストリおよびリポジトリの操作 をご参照ください。
イメージリポジトリの作成¶
イメージリポジトリを作成するには、まずリポジトリ名を指定する ImageRepository
オブジェクトを作成します。
次の例のコードは、 my_repo
という名前のリポジトリを表す ImageRepository
オブジェクトを作成します。
from snowflake.core.image_repository import ImageRepository
my_repo = ImageRepository("my_repo")
root.databases["my_db"].schemas["my_schema"].image_repositories.create(my_repo)
次に、コードは ImageRepository
オブジェクトを ImageRepositoryCollection.create
メソッドに渡してイメージリポジトリを作成することで、 my_db
データベースと my_schema
スキーマにイメージリポジトリを作成します。
イメージリポジトリ詳細の取得¶
ImageRepository
オブジェクトを返す ImageRepositoryResource.fetch
メソッドを呼び出すことで、イメージリポジトリに関する情報を取得できます。
次の例のコードは、 my_repo
イメージリポジトリを表す ImageRepository
オブジェクトを取得して、リポジトリの所有者の名前を印刷します。
my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo = my_repo_res.fetch()
print(my_repo.owner)
イメージリポジトリの一覧表示¶
ImageRepository
オブジェクトの PagedIter
反復子を返す iter
メソッドを使用して、指定したスキーマのイメージリポジトリをリストすることができます。
次の例のコードは、 my_db
データベースと my_schema
スキーマのリポジトリ名をリストします。
repo_list = root.databases["my_db"].schemas["my_schema"].image_repositories.iter()
for repo_obj in repo_list:
print(repo_obj.name)
イメージリポジトリのドロップ¶
ImageRepositoryResource.drop
メソッドを使用してイメージリポジトリをドロップできます。
次の例のコードは、 my_repo
リポジトリをドロップします。
my_repo_res = root.databases["my_db"].schemas["my_schema"].image_repositories["my_repo"]
my_repo_res.drop()
サービスとサービス関数の管理¶
停止させるまでアプリケーションコンテナを実行するサービスを管理できます。サービスコンテナが停止した場合、Snowflakeは自動的にサービスを再起動します。こうすることで、サービスは実質的に中断することなく実行されます。
サービスはスキーマレベルのオブジェクトです。サービスを作成または参照するときは、そのスキーマのコンテキスト内で行います。
Snowflake Python APIs は、2つの別々のタイプでサービスを表します。
Service
: サービスの仕様、最小インスタンス数、最大インスタンス数、データベース名、スキーマ名などのプロパティを公開します。ServiceResource
: 対応するService
オブジェクトのフェッチ、サービスの中断と再開、そのステータスの取得に使用できるメソッドを公開します。
サービスに関する詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。
サービスの作成¶
サービスを作成するには、 services.create
メソッドを実行し、作成したいサービスを表す Service
オブジェクトを渡します。
ステージにアップロードされたサービス仕様 .yaml
ファイルからサービスを作成します。サービス仕様の作成に関する詳細については、 サービス仕様リファレンス をご参照ください。
仕様のアップロード¶
まだステージにアップロードされていない仕様からサービスを作成する場合は、Snowpark FileOperation オブジェクトを使用して仕様をアップロードできます。
次の例のコードは、 FileOperation.put
メソッドを使用して仕様をファイルとしてアップロードします。
session.file.put("/local_location/my_service_spec.yaml", "@my_stage")
次の例のコードは、 FileOperation.put_stream
メソッドを使用して仕様を文字列としてアップロードします。
service_spec_string = """
// Specification as a string.
"""
session.file.put_stream(StringIO(sepc_in_string), "@my_stage/my_service_spec.yaml")
サービスの作成¶
ステージングされた仕様からサービスを作成するには、まず以下のようなサービスプロパティを指定する Service
オブジェクトを作成します。
サービス名
Snowflakeが作成できるサービスインスタンスの最大数と最小数
サービスが追加されるべきコンピュートプール
ステージの場所と仕様名
次の例のコードは、 @my_stage/my_service_spec.yaml
の仕様から my_service
というサービスを表す Service
オブジェクトを作成します。
from snowflake.core.service import Service, ServiceSpec
my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec("@my_stage/my_service_spec.yaml"))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
次に、コードは Service
オブジェクトを ServiceCollection.create
メソッドに渡してサービスを作成することで、 my_db
データベースと my_schema
スキーマにサービスを作成します。
次の例に示すように、インラインテキストとして指定した仕様からサービスを作成することもできます。 ServiceSpec
関数は単一の文字列引数 spec
を取ります。文字列が @
で始まる場合、この関数はそれをステージファイルのパスとして解釈して検証します。そうでない場合は、文字列はインラインテキストとして渡されます。
from textwrap import dedent
from snowflake.core.service import Service, ServiceSpec
spec_text = dedent(f"""\
spec:
containers:
- name: hello-world
image: repo/hello-world:latest
endpoints:
- name: hello-world-endpoint
port: 8080
public: true
""")
my_service = Service(name="my_service", min_instances=1, max_instances=2, compute_pool="my_compute_pool", spec=ServiceSpec(spec_text))
root.databases["my_db"].schemas["my_schema"].services.create(my_service)
サービス関数の作成¶
サービスが稼働したら、サービスエンドポイントと通信するサービス関数を作成できます。サービス関数は、 Snowpark Container Services のサービスに関連付けるユーザー定義関数(UDF)です。詳細については、 サービス関数: SQL クエリからのサービスの使用 をご参照ください。
次の例のコードは、 my-udf
という名前の UDF を作成し、以前に定義した hello-world
サービスと hello-world-endpoint
エンドポイントを指定します。
from snowflake.core import CreateMode
from snowflake.core.function import FunctionArgument, ServiceFunction
root.databases["my_db"].schemas["my_schema"].functions.create(
ServiceFunction(
name="my-udf",
arguments=[
FunctionArgument(name="input", datatype="TEXT")
],
returns="TEXT",
service="hello-world",
endpoint="'hello-world-endpoint'",
path="/hello-world-path",
max_batch_rows=5,
),
mode = CreateMode.or_replace
)
サービス関数の呼び出し¶
サービス関数が作成されたら、その関数を呼び出してテストすることができます。
次の例のコードは、以前に作成した my-udf
サービス関数を呼び出します。
result = root.databases["my_db"].schemas["my_schema"].functions["my-udf(TEXT)"].execute_function(["test"])
print(result)
サービス詳細の取得¶
Service
オブジェクトを返す ServiceResource.fetch
メソッドを呼び出すことで、Snowflakeサービスに関する情報を取得できます。
次の例のコードは、 my_service
というサービスに関する情報を取得します。
my_service = root.databases["my_db"].schemas["my_schema"].services["my_service"].fetch()
サービスの一覧表示¶
Service
オブジェクトの PagedIter
反復子を返す iter
メソッドを使用して、指定したスキーマのサービスをリストすることができます。
次の例のコードは、名前が my
で始まるサービスを一覧表示します。
services = root.databases["my_db"].schemas["my_schema"].services.iter(like="my%")
for service_obj in services:
print(service_obj.name)
サービス操作の実行¶
ServiceResource
オブジェクトを使用して、サービスの中断、再開、そのステータスの取得など一般的なサービスの操作を実行できます。
次の例のコードは、 my_service
サービスを中断および再開し、サービスのステータスも所得します。
my_service_res = root.databases["my_db"].schemas["my_schema"].services["my_service"]
my_service_res.suspend()
my_service_res.resume()
status = my_service_res.get_service_status(10)