チュートリアル3: サービス間通信

重要

Snowpark Container Servicesのジョブ機能は現在プライベートプレビュー中であり、 https://snowflake.com/legal のプレビュー規約に従うものとします。詳細については、Snowflakeの担当者にお問い合わせください。

紹介

このチュートリアルでは、 チュートリアル1 で作成したEchoサービスと通信するSnowpark Container Servicesジョブを作成します。ジョブが実行されると、リクエスト本文に「Hello」文字列を含む POST リクエストをEchoサービス URL (ジョブ仕様で提供したもの)に送信します。Echoサービスは、応答本文に「Bob said Hello」という文字列を含む応答を返します。ジョブコンテナーログにアクセスして、通信が成功したことを確認します。

このチュートリアルには2つのパートがあります。

  • パート1: ジョブを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。

    1. このチュートリアルのジョブコードをダウンロードします。

    2. Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。

    3. Snowflakeにコンテナー構成情報を与える仕様ファイルをステージします。この仕様では、コンテナーの起動に使用するイメージの名前に加えて、仕様は、Echoサービス URL に環境変数(SERVICE_URL)を設定します。アプリケーションコードはこの環境変数を読み取り、Echoサービスにリクエストを送信します。

    4. ジョブを実行します。EXECUTE SERVICE コマンドを使用して、仕様ファイルとSnowflakeがコンテナーを実行できるコンピューティングプールを指定すると、ジョブを実行できます。そして最後に、ジョブとサービス間の通信が成功したことを確認するために、ジョブコンテナーからログにアクセスします。

  • パート2: ジョブコードを理解する。このセクションでは、サービスコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。

前提条件

チュートリアル1 を完了し、Echoサービスが実行されていることを確認します。

SELECT SYSTEM$GET_SERVICE_STATUS('echo_service', 10);
Copy

1: サービスコードをダウンロードする

ジョブを作成するためのコード(Pythonアプリケーション)が提供されます。

  1. zipファイル をディレクトリにダウンロードします。

  2. ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。 Tutorial-3 ディレクトリには以下のファイルがあります。

    • service_to_service.py

    • Dockerfile

    • service_to_service_spec.yaml

2: イメージをビルドしてアップロードする

Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。

イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。

リポジトリに関する情報を取得する

  1. リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • 出力の repository_url 列は、 URL を提供します。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

イメージをビルドし、リポジトリにアップロードする

  1. ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。

  2. Dockerイメージをビルドするには、Docker CLI を使用して以下の docker build コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルの PATH として、現在の作業ディレクトリ(.)を指定していることに注意してください。

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • image_name には、 service_to_service:latest を使用します。

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/service_to_service:latest .
    
    Copy
  3. Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まずSnowflakeでDockerを認証する必要があります。

    1. SnowflakeレジストリでDockerを認証するには、以下のコマンドを実行します。

      docker login <registry_hostname> -u <username>
      
      Copy
      • username には、Snowflakeのユーザー名を指定します。Dockerは、パスワードの入力を求めるプロンプトを表示します。

    2. イメージをアップロードするには、以下のコマンドを実行します。

      docker push <repository_url>/<image_name>
      
      Copy

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/service_to_service:latest
      
      Copy

3: 仕様ファイルをステージする

  • ジョブ仕様ファイル(service_to_service_spec.yaml)をステージにアップロードするには、以下のオプションのいずれかを使用します。

    このコマンドは OVERWRITE=TRUE を設定するため、必要な場合(例: 仕様ファイルのエラーを修正した場合)は、ファイルを再度アップロードすることができます。PUT コマンドが正常に実行されると、アップロードされたファイルに関する情報がプリントアウトされます。

4: ジョブを実行する

これで、作成したSnowflakeジョブをテストする準備が整いました。ジョブが実行されると、Snowflakeは、コンテナー内のコードが標準出力または標準エラーに出力したものをログとして収集します。 SYSTEM$GET_JOB_LOGS システム関数を使用して、ログにアクセスできます。詳細については、 Snowpark Container Services: サービスおよびジョブに関するその他の考慮事項 をご参照ください。

  1. ジョブを開始するには、 EXECUTE SERVICE コマンドを実行します。

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='service_to_service_spec.yaml';
    
    Copy

    次の点に注意してください。

    • FROM と SPEC は、ステージ名とジョブ仕様ファイル名を提供します。

    • COMPUTE_POOL は、Snowflakeがジョブを実行するコンピューティングリソースを提供します。

    Snowflakeは、仕様ファイルで特定されたコンテナーを実行します。コンテナーは SERVICE_URL 環境変数値(http://echo-service:8000/echo)を読み取り、 /echo HTTP パスのポート8000でEchoサービスにリクエストを送信します。

    Snowflakeはジョブを開始し、出力にジョブ ID を返します。

    +------------------------------------------------------------------------+
    | status                                                                 |
    |------------------------------------------------------------------------|
    | Execution 01af7ee6-0001-cb52-0020-c5870077223a completed successfully. |
    +------------------------------------------------------------------------+
    

    応答には、Snowflakeが割り当てたクエリジョブ ID が含まれていることに注意してください。

  2. (オプション)ジョブの完了後に、実行されたジョブに関する詳細情報を取得できます。これはジョブの失敗をデバッグするのに便利です。

    1. オプション: 前述の EXECUTE SERVICE コマンドのジョブ IDをセッション変数にキャプチャし、後で参照できるようにするには、以下のコマンドを実行します。

      SET job_id = last_query_id();
      
      Copy

      ジョブが作成された直後に ID をキャプチャする必要があることに注意してください。そうしないと、誤った値がキャプチャされます。

    2. ジョブステータスを取得するには、以下のいずれかのオプションを使用して SYSTEM$GET_JOB_STATUS 関数を呼び出します。

      • EXECUTE SERVICE によって返されたジョブ ID を使用します。

        CALL SYSTEM$GET_JOB_STATUS('<job_id returned by EXECUTE SERVICE>');
        
        Copy
      • セッション変数の使用:

        CALL SYSTEM$GET_JOB_STATUS($job_id);
        
        Copy

      サンプル出力:

      [
        {
          "status":"DONE",
          "message":"Container finished",
          "containerName":"main",
          "instanceId":"0",
          "serviceName":"JOB_01ABD9E5000046DF00000E99000BC01E",
          "image":"myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/service_to_service:latest",
          "restartCount":0,
          "startTime":"2023-01-01T00:00:00Z"
        }
      ]
      
  3. ジョブログを読み取るには、以下のいずれかのオプションを使用します。

    • EXECUTE SERVICE が返すクエリジョブ ID を使用します。

      CALL SYSTEM$GET_JOB_LOGS('<job_id returned by EXECUTE SERVICE>', 'main');
      
      Copy
    • セッション変数を使用します。

      CALL SYSTEM$GET_JOB_LOGS($job_id, 'main');
      
      Copy

    main は、ログを取得するコンテナーの名前です。このコンテナー名は、ジョブ仕様ファイルでコンテナーに設定します。

    サンプルログ:

    +--------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_JOB_LOGS                                                                                                      |
    |--------------------------------------------------------------------------------------------------------------------------|
    | service-to-service [2023-04-29 21:52:09,208] [INFO] Calling http://echo-service:8000/echo with input Hello               |
    | service-to-service [2023-04-29 21:52:09,212] [INFO] Received response from http://echo-service:8000/echo: Bob said Hello |
    +--------------------------------------------------------------------------------------------------------------------------+
    

5: クリーンアップする

Snowflakeは、お客様のアカウントでアクティブなコンピューティングプールノードに対して課金します。(コンピューティングプールの操作 を参照。)不要な課金を防ぐには、まず、コンピューティングプールで現在実行中のサービスをすべて停止します。そして、コンピューティングプールを中断するか(後ほど再度使用する予定の場合)、ドロップします。

  1. コンピューティングプール上のすべてのサービスとジョブを停止します。

    ALTER COMPUTE POOL tutorial_compute_pool STOP ALL;
    
    Copy
  2. コンピューティングプールを削除します。

    DROP COMPUTE POOL tutorial_compute_pool;
    
    Copy

イメージレジストリ(すべてのイメージを削除)と内部ステージ(仕様を削除)をクリーンアップすることもできます。

DROP IMAGE REPOSITORY tutorial_repository;
DROP STAGE tutorial_stage;
Copy

6: ジョブコードを確認する

このセクションでは、以下のトピックを取り上げます。

提供されたファイルの調査

ダウンロードしたzipファイルには以下のファイルが含まれています。

  • service_to_service.py

  • Dockerfile

  • service_to_service_spec.yaml

このセクションでは、コードがジョブを実装する方法の概要を説明します。

service_to_service.pyファイル

import json
import logging
import os
import requests
import sys

SERVICE_URL = os.getenv('SERVICE_URL', 'http://localhost:8080/echo')
ECHO_TEXT = 'Hello'

def get_logger(logger_name):
  logger = logging.getLogger(logger_name)
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  handler.setFormatter(
    logging.Formatter(
      '%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
  logger.addHandler(handler)
  return logger

logger = get_logger('service-to-service')

def call_service(service_url, echo_input):
  logger.info(f'Calling {service_url} with input {echo_input}')

  row_to_send = {"data": [[0, echo_input]]}
  response = requests.post(url=service_url,
                           data=json.dumps(row_to_send),
                           headers={"Content-Type": "application/json"})

  message = response.json()
  if message is None or not message["data"]:
    logger.error('Received empty response from service ' + service_url)

  response_row = message["data"][0]
  if len(response_row) != 2:
    logger.error('Unexpected response format: ' + response_row)

  echo_reponse = response_row[1]
  logger.info(f'Received response from {service_url}: ' + echo_reponse)

if __name__ == '__main__':
  call_service(SERVICE_URL, ECHO_TEXT)
Copy

ジョブの実行時に、

  1. Snowflakeは、コンテナー内の SERVICE_URL 環境変数を設定するために、指定ファイルで提供された値を使用します。

  2. コードは環境変数を読み取ります。

    SERVICE_URL = os.getenv('SERVICE_URL', 'http://localhost:8080/echo').
    
    Copy
  3. call_service() 関数は、 SERVICE_URL を使用してEchoサービスと通信します。

Dockerfile

このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。

ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY service_to_service.py ./
RUN pip install --upgrade pip && \
  pip install requests
CMD ["python3", "service_to_service.py"]
Copy

service_to_service_spec.yamlファイル(ジョブ仕様)

Snowflakeは、この仕様で提供された情報を使用して、サービスを構成および実行します。

spec:
container:
   - name: main
      image: /tutorial_db/data_schema/tutorial_repository/service_to_service:latest
      env:
      SERVICE_URL: "http://echo-service:8000/echo"
Copy

この仕様は、ジョブの構成と実行用の情報をSnowflakeに提供します。Echoサービスと通信するために、このジョブには次が必要です。

  • リクエストを送信するEchoサービスの DNS 名前。

  • Echoサービスがリッスンしている HTTP ポート。

  • Echoサービスが予期しているリクエストを送信する HTTP パス。

この情報を得るには、

  1. Echoサービスの DNS 名(チュートリアル1)を取得するには、 DESCRIBE SERVICE SQL コマンドを実行します。

    DESCRIBE SERVICE echo_service;
    
    Copy

    Echoサービス用に得られる DNS名:

    echo-service.data-schema.tutorial-db.snowflakecomputing.internal
    
    Copy

    このチュートリアルでは、Echoサービス(チュートリアル1)が作成されるのと同じデータベーススキーマ(data-schema)にジョブを作成することに注意してください。したがって、前述の DNS 名の「echo-service」部分のみが SERVICE_URL の構成に必要です。

  2. Echoサービス仕様ファイル(チュートリアル1)から、Echoサービスがリッスンしているポート番号(8000)を取得します。

次に、先行する仕様ファイル(service_to_service_spec.yaml)を作成します。必須フィールドの containers.namecontainers.image に加えて、オプションの containers.env フィールドを含めます。

ローカルでのイメージの構築とテスト

Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナーはスタンドアロンで実行されます(Snowflakeが実行するジョブではありません)。

注釈

このチュートリアルで提供されるPythonコードは、 requests ライブラリを使用して別のSnowpark Containersサービスにリクエストを送信します。このライブラリがインストールされていない場合は、pipを実行します(例: pip3 install requests)。

以下のステップを使用して、チュートリアル3Dockerイメージをテストします。

  1. Echoサービスを実行状態にする必要があります(チュートリアル1)。チュートリアル1のEchoサービスを開始するには、ターミナルウィンドウで次のPythonコマンドを実行します。

    SERVER_PORT=8000 python3 echo_service.py
    
    Copy
  2. 別のターミナルウィンドウを開き、このチュートリアルで提供されるPythonコードを実行します。

    SERVICE_URL=http://localhost:8000/echo python3 service_to_service.py
    
    Copy

    SERVICE_URL は環境変数であることに注意してください。ローカルテストでは、この変数を明示的に設定する必要があります。この URL は、Echoサービスを開始したときに明示的に指定したポートと HTTP パスに一致します。

    ジョブが実行されると、リクエスト本文に「Hello」文字列を含む POST リクエストをポート8080でリッスンしているEchoサービスに送信します。Echoサービスは入力をエコーして、「I said Hello」という応答を返します。

    サンプル応答:

    service-to-service
      [2023-04-23 22:30:41,278]
      [INFO] Calling http://localhost:8000/echo with input Hello
    
    
    service-to-service
      [2023-04-23 22:30:41,287]
      [INFO] Received response from http://localhost:8000/echo: I said Hello
    

    ログをレビューして、サービス間通信が成功したことを確認します。

次の内容

このチュートリアルは完了したため、 上級チュートリアル に戻って他のトピックを調べることができます。