チュートリアル3: Snowflake Python APIs を使用してサービスおよびジョブを作成する

概要

チュートリアル1 および チュートリアル2 では、 SQL インターフェイスを使用して Snowpark コンテナーサービスのサービスとジョブを作成します。このチュートリアルでは、 Snowflake Python APIs を使用して同じサービスとジョブを作成し、 Snowflake Python APIs を使用して Snowpark コンテナーサービスのリソースを管理する方法を探ります。

このチュートリアルでは、Python コードの実行に Snowflake notebook を使用していますが、コードはノートブックに依存しないので、他の環境でも実行できます。

1:初期構成

この初期構成では、Snowflakeノートブックを作成し、必要なライブラリをインポートし、以降のステップでセルが使用する定数を定義します。

  1. Snowflake Notebooksを作成する。

    1. ノートブックを作成する。手順については、 新しいロールを作成する をご参照ください。UI で選択した Python環境 (ウェアハウスで実行、コンテナーで実行)は関係ないことに注意してください。

    2. Packages ドロップダウンメニューから "Snowflake" パッケージを選択し、最新バージョンの Snowflake Python APIs ライブラリをインストールします。

    3. (オプション) デフォルトでノートブックにプロバイダーされているセルを削除します。このチュートリアルの手順に従って、Python のセルをノートブックに追加していきます。

  2. このチュートリアルの多くのセルで使用される Python ライブラリをインポートするセルを作成し、実行します。

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
    
    Copy
  3. 以降のセルで使用する定数を定義するセルを作成し、実行します。以下のプロバイダーの値は、チュートリアル1と2に一致しています。オプションでこれらの値を変更することができます。

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")
    
    Copy

2: Snowflakeオブジェクトの作成

サービスを作成する前に、データベース、ユーザー、ロール、Computeプール、イメージリポジトリなどのSnowflakeオブジェクトが必要です。これらのオブジェクトの中には、作成に管理者権限を必要とするアカウント・スコープのオブジェクトもあります。作成されるオブジェクトの名前は、前のステップで定義されています。

2.1: アカウントスコープのSnowflakeオブジェクトの作成

以下のPythonコードはこれらのオブジェクトを作成します。

  • ロール(test_role)。このロールには、サービスの作成と使用に必要なすべての権限を付与します。コードでは、このロールを現在のユーザーに付与し、そのユーザーがサービスを作成および使用できるようにします。

  • データベース (tutorial_db)。次のステップでは、このデータベースにスキーマを作成します。

  • コンピューティングプール (tutorial_compute_pool)サービスコンテナーはこのコンピュートプールで実行されます。

  • ウェアハウス(tutorial_warehouse)。サービスがSnowflakeに接続してクエリを実行するとき、このウェアハウスがクエリの実行に使用されます。

ACCOUNTADMIN ロールを使用して、これらのアカウント・オブジェクトを作成するセルを作成し、実行します。スクリプトは、リソースが存在しない場合にのみリソースを作成することに注意してください。コード中のコメントは、同等の SQL ステートメントを示しています。

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")
Copy

リソースを作成すると、コードは必要な権限もロール(test_role)に付与し、ロールがこれらのリソースを使用できるようにします。さらに、このチュートリアルで作成する echo サービスは、1 つのパブリックエンドポイントを公開することに注意してください。このパブリックエンドポイントでは、アカウント内の他のユーザーがパブリックウェブ(イングレス)からサービスにアクセスできます。パブリックエンドポイントを持つサービスを作成するには、ロール(test_role)がアカウントで BIND SERVICE ENDPOINT 権限を持っている必要があります。

2.2 スキーマスコープオブジェクトの作成

このセクションのPythonコードは、 test_role ロールを使用してスキーマとそのスキーマ内のオブジェクトを作成します。これらのリソースを作成するのに、管理者権限は必要ありません。

  • スキーマ (data_schema)。このスキーマでイメージリポジトリ、サービス、ジョブを作成します。

  • イメージリポジトリ(tutorial_repository)。このリポジトリにアプリケーションイメージを保存します。

  • ステージ (tutorial_stage)。ステージングは説明のために作られたものです。このチュートリアルでは実演していませんが、ステージングはサービスにデータを渡したり、サービスからデータを収集したりするために使用できます。

スクリプトは、リソースが存在しない場合にのみリソースを作成することに注意してください。

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Copy

Python コードでは、リポジトリに画像をプッシュする際に使用する、リポジトリに関する便利な情報 (リポジトリ URL) も出力します。

3: イメージをビルドしてアップロードする

チュートリアル1 で説明されているようにコードをローカルにダウンロードし、Dockerコマンドを使ってイメージをビルドし、アカウントのイメージリポジトリにアップロードします。

  1. イメージレジストリ のホスト名 と、イメージリポジトリの URL を取得するセルを作成して実行 します。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Pythonコードは、イメージリポジトリ リソース オブジェクト (repo) を取得し、 モデル オブジェクトにアクセスし、そこからリポジトリ URL を抽出します。

  2. チュートリアル1 ステップ1と2に従い、サービスコードをダウンロードし、イメージをビルドし、リポジトリにアップロードします。

  3. セルを作成して実行し、イメージがリポジトリにあることを確認します。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    このコードでは、イメージリポジトリ リソース(repo)から画像を列挙し、各画像の image_path を表示します。

4: サービスを作成する

サービスと通信するためのサービス関数を作成します。

  1. コンピュートプールが準備できていることを確認します。コンピュートプールを作成した後、Snowflakeがすべてのノードをプロビジョニングするのに時間がかかります。サービスコンテナーは指定されたコンピュートプール内で実行されるため、サービスを作成する前にコンピュートプールが準備できていることを確認してください。

    セルを作成して実行し、コンピュートプール のステータスを取得します。

    import time
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    cp = root.compute_pools[compute_pool_name]
    
    cpm = cp.fetch()
    print(cpm.state, cpm.status_message)
    if cpm.state == 'SUSPENDED':
        cp.resume()
    while cpm.state in ['STARTING', 'SUSPENDED']:
        time.sleep(5)
        cpm = cp.fetch()
        print(cpm.state, cpm.status_message)
    
    Copy

    このコードは、現在のコンピュートプールの状態を取得するために、 コンピュートプールリソース(cp)からコンピュートプールモデル(cpm)を取得します。コンピュートプールが一時停止している場合、コードはコンピュートプールを再開します。このコードは、コンピュートプールが STARTING または SUSPENDED の状態でなくなるまで、毎回5秒間休止しながらループします。

    出力の最後の行は、"IDLE"あるいは"ACTIVE"になっているはずです。これは、コンピュートプールがサービスを実行する準備ができていることを示しています。詳細情報については、 コンピュートプールのライフサイクル をご参照ください。コンピュートプールの準備ができていないと、サービスは開始できません。

  2. セルを作成して実行し、エコー・サービスを作成します。

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
        spec:
          containers:
          - name: echo
            image: {repo_url}/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
    
        """
    echo_service = schema.services.create(Service(
        name=service_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification),
        min_instances=1,
        max_instances=1),
        mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)
    
    Copy

    このコードは、前のステップで行ったように、リポジトリ URL を取得します。このコードでは、インライン指定と指定されたイメージリポジトリからの画像を使用して echo_service を作成します。

    Pythonのコードを見ればわかるように、リソースの名前をパラメーター化するのは簡単です。以下は、パラメーターを使用せずにサービスを作成する、同等の SQL コマンドです。

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
      $$
      MIN_INSTANCES=1
      MAX_INSTANCES=1;
    
    Copy
  3. セルを実行してサービス関数(my_echo_function)を作成します。サービス関数とは、サービスを利用する方法の一つです。

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
        function=ServiceFunction(
            name="my_echo_function",
            arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
            returns="TEXT",
            service=service_name,
            endpoint="echoendpoint",
            path="/echo"))
    print("created service function:", svcfn.name_with_args)
    
    Copy

    このコードでは、 schemafunctions コレクションに対して create メソッドを呼び出し、サービス関数 (my_echo_function) を作成しています。

5: サービスを使用する

このセクションでは、以下のようにサービスを利用します。

  • サービス関数を呼び出します。

  • ブラウザを使用して、サービスのパブリックエンドポイントと対話します。

  1. サービス関数を呼び出します。

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))
    
    Copy

    Snowflakeは、 POST リクエストをサービスエンドポイント(echoendpoint)に送信します。リクエストを受信すると、サービスはレスポンスに入力文字列をエコーします。

    出力:

    +--------------------------+
    | **MY_ECHO_UDF('HELLO!')**|
    |------------------------- |
    | Bob said hello!          |
    +--------------------------+
    
  2. サービスが公開するパブリックエンドポイントにブラウザからアクセスします。

    1. パブリックエンドポイントの URL を取得します。

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
              # Find the target endpoint.
              target_endpoint = None
              for ep in svc.get_endpoints():
                  if ep.is_public and ep.name == endpoint:
                      target_endpoint = ep
                      break;
              else:
                  print(f"Endpoint {endpoint} not found")
                  return None
      
              # Return endpoint URL or wait for it to be provisioned.
              if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
                  print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
                  time.sleep(10)
              else:
                  return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
      
      Copy
    2. 印刷した URL をブラウザのウィンドウに貼り付けます。これにより、サービスは ui() 関数を実行します(echo_service.py を参照)。

      エンドポイント URL に初めてアクセスするときは、Snowflakeにログインするようにリクエストされることに注意してください。このテストでは、サービスの作成に使用したユーザーと同じユーザーを使用し、そのユーザーが必要な権限を持っていることを確認します。

      Echoサービスと通信するためのウェブフォーム。
    3. Input ボックスに文字列「Hello」を入力し、 Return を押します。

      Echoサービスからの応答を表示するウェブフォーム。

6: ジョブの作成

チュートリアル2では、 SQL インターフェイスを使用して Snowpark コンテナーサービスのジョブを作成します。このセクションでは、 Snowflake Python APIs を使って同じジョブを作成します。

  1. イメージレジストリ のホスト名 と、イメージリポジトリの URL を取得するセルを作成して実行 します。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Pythonコードは、イメージリポジトリ リソースオブジェクト(repo)を取得し、モデルオブジェクトにアクセスし、そこからリポジトリ URL を抽出します。

  2. チュートリアル2 ステップ1と2に従い、サービスコードをダウンロードし、イメージをビルドし、リポジトリにアップロードします。

  3. セルを作成して実行し、イメージがリポジトリにあることを確認します。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    このコードでは、イメージリポジトリ リソース(repo)から画像を列挙し、各画像の image_path を表示します。

  4. ジョブを作成するセルを作成し、実行します。

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
        spec:
          containers:
          - name: main
            image: {repo_url}/my_job_image:latest
            env:
              SNOWFLAKE_WAREHOUSE: {warehouse_name}
            args:
            - "--query=select current_time() as time,'hello'"
            - "--result_table=results"
        """
    job = schema.services.execute_job(JobService(
        name=job_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))
    
    Copy

    このジョブは指定されたクエリを実行し、結果をテーブルに保存します。

  5. 以下のセルを実行して、テーブルに書き込まれた結果を確認してください。このコードでは、Snowpark Pythonを使用してそのテーブルをクエリします。

    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()
    
    Copy

7: クリーンアップする

  1. サービスを停止し、削除してください。サービスを停止すると、Snowflakeはデフォルトで自動的にCompute Poolを一時停止します(他のサービスやジョブサービスが実行されていないと仮定します)。詳細情報については、 コンピュートプールのライフサイクル をご参照ください。

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
    
    Copy
  2. ストレージへの支払いを避けるため、イメージリポジトリを削除します。リポジトリに他のイメージが保存されている場合、それらは削除されますのでご注意ください。

    schema.image_repositories[repo_name].drop()
    
    Copy
  3. スキーマをドロップする。スキーマを削除すると、そのスキーマ内のすべてのオブジェクトも削除されます。このチュートリアルには、サービス、関数、イメージリポジトリ、作成したステージが含まれます。

    root.databases[database_name].schemas[schema_name].drop()
    
    Copy
  4. Snowflakeがコンピュートプールをサスペンドするのを待つ代わりに、明示的にコンピュートプールを中断することもできます。この場合、Snowflakeは実行中のサービスを一時停止し、実行中のジョブが終了するのを待ってから、コンピュートプールを一時停止します。

    root.compute_pool[compute_pool_name].suspend()
    
    Copy

次の内容

このチュートリアルでは、 Snowflake Python APIs を使用して Snowpark コンテナーサービスのサービスとジョブを作成および管理する方法を示します。 Snowflake Python APIs の情報については、 Snowflake Python APIs: PythonによるSnowflakeオブジェクトの管理 をご参照ください。