チュートリアル3: Snowflake Python APIs を使用してサービスおよびジョブを作成する¶
概要¶
チュートリアル1 および チュートリアル2 では、 SQL インターフェイスを使用して Snowpark コンテナーサービスのサービスとジョブを作成します。このチュートリアルでは、 Snowflake Python APIs を使用して同じサービスとジョブを作成し、 Snowflake Python APIs を使用して Snowpark コンテナーサービスのリソースを管理する方法を探ります。
このチュートリアルでは、Python コードの実行に Snowflake notebook を使用していますが、コードはノートブックに依存しないので、他の環境でも実行できます。
1:初期構成¶
この初期構成では、Snowflakeノートブックを作成し、必要なライブラリをインポートし、以降のステップでセルが使用する定数を定義します。
Snowflake Notebooksを作成する。
ノートブックを作成する。手順については、 新しいロールを作成する をご参照ください。UI で選択した Python環境 (ウェアハウスで実行、コンテナーで実行)は関係ないことに注意してください。
Packages ドロップダウンメニューから "Snowflake" パッケージを選択し、最新バージョンの Snowflake Python APIs ライブラリをインストールします。
(オプション) デフォルトでノートブックにプロバイダーされているセルを削除します。このチュートリアルの手順に従って、Python のセルをノートブックに追加していきます。
このチュートリアルの多くのセルで使用される Python ライブラリをインポートするセルを作成し、実行します。
from snowflake.snowpark.context import get_active_session from snowflake.core import Root from snowflake.core import CreateMode
以降のセルで使用する定数を定義するセルを作成し、実行します。以下のプロバイダーの値は、チュートリアル1と2に一致しています。オプションでこれらの値を変更することができます。
current_user = get_active_session().get_current_user() user_role_name = "test_role" compute_pool_name = "tutorial_compute_pool" warehouse_name = "tutorial_warehouse" database_name = "tutorial_db" schema_name = "data_schema" repo_name = "tutorial_repository" stage_name = "tutorial_stage" service_name = "echo_service" print("configured!")
2: Snowflakeオブジェクトの作成¶
サービスを作成する前に、データベース、ユーザー、ロール、Computeプール、イメージリポジトリなどのSnowflakeオブジェクトが必要です。これらのオブジェクトの中には、作成に管理者権限を必要とするアカウント・スコープのオブジェクトもあります。作成されるオブジェクトの名前は、前のステップで定義されています。
2.1: アカウントスコープのSnowflakeオブジェクトの作成¶
以下のPythonコードはこれらのオブジェクトを作成します。
ロール(
test_role
)。このロールには、サービスの作成と使用に必要なすべての権限を付与します。コードでは、このロールを現在のユーザーに付与し、そのユーザーがサービスを作成および使用できるようにします。データベース (
tutorial_db
)。次のステップでは、このデータベースにスキーマを作成します。コンピューティングプール (
tutorial_compute_pool
)サービスコンテナーはこのコンピュートプールで実行されます。ウェアハウス(
tutorial_warehouse
)。サービスがSnowflakeに接続してクエリを実行するとき、このウェアハウスがクエリの実行に使用されます。
ACCOUNTADMIN ロールを使用して、これらのアカウント・オブジェクトを作成するセルを作成し、実行します。スクリプトは、リソースが存在しない場合にのみリソースを作成することに注意してください。コード中のコメントは、同等の SQL ステートメントを示しています。
from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse
session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)
# CREATE ROLE test_role;
root.roles.create(
Role(name=user_role_name),
mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)
# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
securable=Securables.role(user_role_name),
grantee=Grantees.user(name=current_user),
))
# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
# MIN_NODES = 1 MAX_NODES = 1
# INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
mode=CreateMode.if_not_exists,
compute_pool=ComputePool(
name=compute_pool_name,
instance_family="CPU_X64_XS",
min_nodes=1,
max_nodes=2,
)
)
# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
securable=Securables.compute_pool(compute_pool_name),
grantee=Grantees.role(name=user_role_name)
))
print(f"Created compute pool:", compute_pool_name)
# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
Database(name=database_name),
mode=CreateMode.if_not_exists)
# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.all_privileges],
securable=Securables.database(database_name),
grantee=Grantees.role(name=user_role_name),
))
print("Created database:", database_name)
# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
mode=CreateMode.if_not_exists)
# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.usage],
grantee=Grantees.role(name=user_role_name),
securable=Securables.warehouse(warehouse_name)
))
print("Created warehouse:", warehouse_name)
# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.bind_service_endpoint],
securable=Securables.current_account,
grantee=Grantees.role(name=user_role_name)
))
print("Done: GRANT BIND SERVICE ENDPOINT")
リソースを作成すると、コードは必要な権限もロール(test_role
)に付与し、ロールがこれらのリソースを使用できるようにします。さらに、このチュートリアルで作成する echo サービスは、1 つのパブリックエンドポイントを公開することに注意してください。このパブリックエンドポイントでは、アカウント内の他のユーザーがパブリックウェブ(イングレス)からサービスにアクセスできます。パブリックエンドポイントを持つサービスを作成するには、ロール(test_role
)がアカウントで BIND SERVICE ENDPOINT
権限を持っている必要があります。
2.2 スキーマスコープオブジェクトの作成¶
このセクションのPythonコードは、 test_role
ロールを使用してスキーマとそのスキーマ内のオブジェクトを作成します。これらのリソースを作成するのに、管理者権限は必要ありません。
スキーマ (
data_schema
)。このスキーマでイメージリポジトリ、サービス、ジョブを作成します。イメージリポジトリ(
tutorial_repository
)。このリポジトリにアプリケーションイメージを保存します。ステージ (
tutorial_stage
)。ステージングは説明のために作られたものです。このチュートリアルでは実演していませんが、ステージングはサービスにデータを渡したり、サービスからデータを収集したりするために使用できます。
スクリプトは、リソースが存在しない場合にのみリソースを作成することに注意してください。
from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable
session = get_active_session()
session.use_role(user_role_name)
root = Root(session)
# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
Schema(name=schema_name),
mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)
# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
ImageRepository(name=repo_name),
mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)
repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")
#CREATE STAGE IF NOT EXISTS tutorial_stage
# DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
Stage(
name=stage_name,
directory_table=StageDirectoryTable(enable=True)),
mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Python コードでは、リポジトリに画像をプッシュする際に使用する、リポジトリに関する便利な情報 (リポジトリ URL) も出力します。
3: イメージをビルドしてアップロードする¶
チュートリアル1 で説明されているようにコードをローカルにダウンロードし、Dockerコマンドを使ってイメージをビルドし、アカウントのイメージリポジトリにアップロードします。
イメージレジストリ のホスト名 と、イメージリポジトリの URL を取得するセルを作成して実行 します。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Pythonコードは、イメージリポジトリ リソース オブジェクト (
repo
) を取得し、 モデル オブジェクトにアクセスし、そこからリポジトリ URL を抽出します。チュートリアル1 ステップ1と2に従い、サービスコードをダウンロードし、イメージをビルドし、リポジトリにアップロードします。
セルを作成して実行し、イメージがリポジトリにあることを確認します。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
このコードでは、イメージリポジトリ リソース(
repo
)から画像を列挙し、各画像のimage_path
を表示します。
4: サービスを作成する¶
サービスと通信するためのサービス関数を作成します。
コンピュートプールが準備できていることを確認します。コンピュートプールを作成した後、Snowflakeがすべてのノードをプロビジョニングするのに時間がかかります。サービスコンテナーは指定されたコンピュートプール内で実行されるため、サービスを作成する前にコンピュートプールが準備できていることを確認してください。
セルを作成して実行し、コンピュートプール のステータスを取得します。
import time session = get_active_session() session.use_role(user_role_name) root = Root(session) cp = root.compute_pools[compute_pool_name] cpm = cp.fetch() print(cpm.state, cpm.status_message) if cpm.state == 'SUSPENDED': cp.resume() while cpm.state in ['STARTING', 'SUSPENDED']: time.sleep(5) cpm = cp.fetch() print(cpm.state, cpm.status_message)
このコードは、現在のコンピュートプールの状態を取得するために、 コンピュートプールリソース(
cp
)からコンピュートプールモデル(cpm
)を取得します。コンピュートプールが一時停止している場合、コードはコンピュートプールを再開します。このコードは、コンピュートプールが STARTING または SUSPENDED の状態でなくなるまで、毎回5秒間休止しながらループします。出力の最後の行は、"IDLE"あるいは"ACTIVE"になっているはずです。これは、コンピュートプールがサービスを実行する準備ができていることを示しています。詳細情報については、 コンピュートプールのライフサイクル をご参照ください。コンピュートプールの準備ができていないと、サービスは開始できません。
セルを作成して実行し、エコー・サービスを作成します。
from snowflake.core.service import Service, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url specification = f""" spec: containers: - name: echo image: {repo_url}/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true """ echo_service = schema.services.create(Service( name=service_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification), min_instances=1, max_instances=1), mode=CreateMode.if_not_exists) print("created service:", echo_service.name)
このコードは、前のステップで行ったように、リポジトリ URL を取得します。このコードでは、インライン指定と指定されたイメージリポジトリからの画像を使用して
echo_service
を作成します。Pythonのコードを見ればわかるように、リソースの名前をパラメーター化するのは簡単です。以下は、パラメーターを使用せずにサービスを作成する、同等の SQL コマンドです。
CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
セルを実行してサービス関数(
my_echo_function
)を作成します。サービス関数とは、サービスを利用する方法の一つです。from snowflake.core.function import ServiceFunction, FunctionArgument session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # CREATE FUNCTION my_echo_udf (inputtext VARCHAR) # RETURNS VARCHAR # SERVICE=echo_service # ENDPOINT=echoendpoint # AS '/echo'; svcfn = schema.functions.create(mode=CreateMode.or_replace, function=ServiceFunction( name="my_echo_function", arguments=[FunctionArgument(name="inputtext", datatype="TEXT")], returns="TEXT", service=service_name, endpoint="echoendpoint", path="/echo")) print("created service function:", svcfn.name_with_args)
このコードでは、
schema
のfunctions
コレクションに対してcreate
メソッドを呼び出し、サービス関数 (my_echo_function
) を作成しています。
5: サービスを使用する¶
このセクションでは、以下のようにサービスを利用します。
サービス関数を呼び出します。
ブラウザを使用して、サービスのパブリックエンドポイントと対話します。
サービス関数を呼び出します。
svcfn = schema.functions["my_echo_function(TEXT)"] print(svcfn.execute(["hello"]))
Snowflakeは、 POST リクエストをサービスエンドポイント(
echoendpoint
)に送信します。リクエストを受信すると、サービスはレスポンスに入力文字列をエコーします。出力:
+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
サービスが公開するパブリックエンドポイントにブラウザからアクセスします。
パブリックエンドポイントの URL を取得します。
# helper to check if service is ready and return endpoint url def get_ingress_for_endpoint(svc, endpoint): for _ in range(10): # only try 10 times # Find the target endpoint. target_endpoint = None for ep in svc.get_endpoints(): if ep.is_public and ep.name == endpoint: target_endpoint = ep break; else: print(f"Endpoint {endpoint} not found") return None # Return endpoint URL or wait for it to be provisioned. if target_endpoint.ingress_url.startswith("Endpoints provisioning "): print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.") time.sleep(10) else: return target_endpoint.ingress_url print("Timed out waiting for endpoint to become available") endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint") print(f"https://{endpoint_url}/ui")
印刷した URL をブラウザのウィンドウに貼り付けます。これにより、サービスは
ui()
関数を実行します(echo_service.py
を参照)。エンドポイント URL に初めてアクセスするときは、Snowflakeにログインするようにリクエストされることに注意してください。このテストでは、サービスの作成に使用したユーザーと同じユーザーを使用し、そのユーザーが必要な権限を持っていることを確認します。
Input ボックスに文字列「Hello」を入力し、 Return を押します。
6: ジョブの作成¶
チュートリアル2では、 SQL インターフェイスを使用して Snowpark コンテナーサービスのジョブを作成します。このセクションでは、 Snowflake Python APIs を使って同じジョブを作成します。
イメージレジストリ のホスト名 と、イメージリポジトリの URL を取得するセルを作成して実行 します。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Pythonコードは、イメージリポジトリ リソースオブジェクト(
repo
)を取得し、モデルオブジェクトにアクセスし、そこからリポジトリ URL を抽出します。チュートリアル2 ステップ1と2に従い、サービスコードをダウンロードし、イメージをビルドし、リポジトリにアップロードします。
セルを作成して実行し、イメージがリポジトリにあることを確認します。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
このコードでは、イメージリポジトリ リソース(
repo
)から画像を列挙し、各画像のimage_path
を表示します。ジョブを作成するセルを作成し、実行します。
from snowflake.core.service import JobService, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url job_name = "test_job" # cleanup previous job if present. schema.services[job_name].drop()(if_exists=True) specification = f""" spec: containers: - name: main image: {repo_url}/my_job_image:latest env: SNOWFLAKE_WAREHOUSE: {warehouse_name} args: - "--query=select current_time() as time,'hello'" - "--result_table=results" """ job = schema.services.execute_job(JobService( name=job_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification))) print("executed job:", job.name, "status:", job.fetch().status) print("job logs:") print(job.get_service_logs(0, "main"))
このジョブは指定されたクエリを実行し、結果をテーブルに保存します。
以下のセルを実行して、テーブルに書き込まれた結果を確認してください。このコードでは、Snowpark Pythonを使用してそのテーブルをクエリします。
session = get_active_session() session.use_role(user_role_name) # show that above job wrote to results table session.sql(f"select * from {database_name}.{schema_name}.results").collect()
7: クリーンアップする¶
サービスを停止し、削除してください。サービスを停止すると、Snowflakeはデフォルトで自動的にCompute Poolを一時停止します(他のサービスやジョブサービスが実行されていないと仮定します)。詳細情報については、 コンピュートプールのライフサイクル をご参照ください。
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # now let's clean up schema.functions["my_echo_function(TEXT)"].drop() schema.services[service_name].drop()
ストレージへの支払いを避けるため、イメージリポジトリを削除します。リポジトリに他のイメージが保存されている場合、それらは削除されますのでご注意ください。
schema.image_repositories[repo_name].drop()
スキーマをドロップする。スキーマを削除すると、そのスキーマ内のすべてのオブジェクトも削除されます。このチュートリアルには、サービス、関数、イメージリポジトリ、作成したステージが含まれます。
root.databases[database_name].schemas[schema_name].drop()
Snowflakeがコンピュートプールをサスペンドするのを待つ代わりに、明示的にコンピュートプールを中断することもできます。この場合、Snowflakeは実行中のサービスを一時停止し、実行中のジョブが終了するのを待ってから、コンピュートプールを一時停止します。
root.compute_pool[compute_pool_name].suspend()
次の内容¶
このチュートリアルでは、 Snowflake Python APIs を使用して Snowpark コンテナーサービスのサービスとジョブを作成および管理する方法を示します。 Snowflake Python APIs の情報については、 Snowflake Python APIs: PythonによるSnowflakeオブジェクトの管理 をご参照ください。