チュートリアル1: Snowpark Container Servicesサービスを作成する¶
概要¶
共通セットアップ が完了すると、サービスを作成する準備が整います。このチュートリアルでは、入力されたテキストを単純にエコーバックするサービス(echo_service
という名前)を作成します。たとえば、入力文字列が「Hello World」の場合、サービスは「I said, Hello World」を返します。
このチュートリアルには2つのパートがあります。
パート1: サービスを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。
このチュートリアルのサービスコードをダウンロードします。
Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。
サービス仕様ファイルとサービスを実行するコンピューティングプールを提供することで、サービスを作成します。
サービスと通信するためのサービス関数を作成します。
サービスを使用します。サービスにエコーリクエストを送り、その応答を検証します。
パート2: サービスを理解する。このセクションでは、サービスコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。
1: サービスコードをダウンロードする¶
Echoサービスを作成するためのコード(Pythonアプリケーション)が提供されます。
ダウンロード:
SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip>
をダウンロードします。ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。
Tutorial-1
ディレクトリには以下のファイルがあります。Dockerfile
echo_service.py
templates/basic_ui.html
2: イメージをビルドしてアップロードする¶
Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。
イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。
リポジトリに関する情報を取得する
リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。
SHOW IMAGE REPOSITORIES;
出力の
repository_url
列は、 URL を提供します。以下に例を示します。<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。
<orgname>-<acctname>.registry.snowflakecomputing.com
イメージをビルドし、リポジトリにアップロードする
ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。
Dockerイメージをビルドするには、Docker CLI を使用して以下の
docker build
コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルのPATH
として、現在の作業ディレクトリ(.
)を指定することに注意してください。docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
image_name
には、my_echo_service_image:latest
を使用します。
例
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まず レジストリでDockerを認証する 必要があります。
イメージレジストリでDockerを認証するには、以下のコマンドを実行します。
docker login <registry_hostname> -u <username>
username
には、Snowflakeのユーザー名を指定します。Dockerは、パスワードの入力を求めるプロンプトを表示します。
イメージをアップロードするには、以下のコマンドを実行します。
docker push <repository_url>/<image_name>
例
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3: サービスを作成する¶
このセクションでは、サービスを作成し、サービスと通信するためのサービス関数も作成します。
サービスを作成するには、以下が必要です。
コンピューティングプール。Snowflakeは指定したコンピューティングプールでサービスを実行します。共通セットアップの一環としてコンピューティングプールを作成しました。
サービス仕様。この仕様は、サービスの構成と実行に必要な情報をSnowflakeに提供します。詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。このチュートリアルでは、 CREATE SERVICE コマンドで仕様をインラインで提供します。また、チュートリアル2に示すように、仕様をSnowflakeステージのファイルに保存し、 CREATE SERVICE コマンドでファイル情報を提供することもできます。
サービス関数は、サービスと通信するために利用できる方法の1つです。サービス関数は、サービスエンドポイントに関連付けるユーザー定義関数(UDF)です。サービス関数が実行されると、サービスエンドポイントにリクエストを送り、応答を受け取ります。
コンピューティングプールの準備ができ、サービスを作成するのに適切なコンテキストにいることを確認します。
以前は 共通セットアップ ステップでコンテキストを設定しました。このステップの SQL ステートメントのコンテキストが正しいことを確認するために、以下を実行します。
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
共通セットアップ で作成したコンピューティングプールが準備できていることを確認するために、
DESCRIBE COMPUTE POOL
を実行し、state
がACTIVE
またはIDLE
であることを確認します。state
がSTARTING
の場合は、state
がACTIVE
かIDLE
に変わるまで待つ必要があります。
DESCRIBE COMPUTE POOL tutorial_compute_pool;
サービスを作成するには、
test_role
を使用して以下のコマンドを実行します。CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
注釈
その名前のサービスがすでに存在する場合は、 DROP SERVICE コマンドを使用して以前に作成したサービスを削除してから、このサービスを作成します。
以下の SQL コマンドを実行して、先ほど作成したサービスの詳細情報を取得します。詳細については、 Snowpark Container Services: サービスの操作 をご参照ください。
アカウント内のサービスをリストするには、 SHOW SERVICES コマンドを実行します。
SHOW SERVICES;
サービスのステータスを取得するには、システム関数 SYSTEM$GET_SERVICE_STATUS を呼び出します。
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
サービスに関する情報を取得するには、 DESCRIBE SERVICE コマンドを実行します。
DESCRIBE SERVICE echo_service;
サービス関数を作成するには、以下のコマンドを実行します。
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
次の点に注意してください。
SERVICE プロパティは、 UDF と
echo_service
サービスを関連付けます。ENDPOINT プロパティは、 UDF をサービス内の
echoendpoint
エンドポイントと関連付けます。AS 「/echo」は、エコーサーバーへの HTTP パスを指定します。このパスは、サービスコード
echo_service.py
にあります。
4: サービスを使用する¶
まず、このセクションの SQL ステートメントのコンテキストを設定し、以下を実行します。
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
これで、Echoサービスと通信できるようになりました。
**サービス関数の使用
例1.1: 単一の文字列を渡す
my_echo_udf
サービス関数を呼び出すには、次の SELECT ステートメントを実行し、1つの入力文字列('hello'
)を渡します。SELECT my_echo_udf('hello!');
Snowflakeは、 POST リクエストをサービスエンドポイント(
echoendpoint
)に送信します。リクエストを受信すると、サービスは応答に入力文字列をエコーします。+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
例1.2: 文字列のリストを渡す
サービス関数に文字列のリストを渡すと、Snowflakeはこれらの入力文字列を一括して、一連の POST リクエストをサービスに送信します。サービスがすべての文字列を処理した後、Snowflakeは結果を結合して返します。
以下の例では、テーブル列を入力としてサービス関数に渡します。
複数の文字列からなるテーブルを作成します。
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
テーブルが作成されたことを確認します。
SELECT * FROM messages;
サービス関数を呼び出すには、次の SELECT ステートメントを実行し、入力としてテーブル行を渡します。
SELECT my_echo_udf(message_text) FROM messages;
出力:
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
**ウェブブラウザーの使用
サービスが公開しているパブリックエンドポイントの URL を見つけます。
SHOW ENDPOINTS IN SERVICE echo_service;
応答の
ingress_url
列は URL を提供します。例
p6bye-myorg-myacct.snowflakecomputing.app
/ui
をエンドポイント URL に追加し、ウェブブラウザーに貼り付けます。これにより、サービスはui()
関数を実行します(echo_service.py
を参照)。エンドポイント URL に初めてアクセスするときは、Snowflakeにログインするようにリクエストされることに注意してください。このテストでは、サービスの作成に使用したユーザーと同じユーザーを使用し、そのユーザーが必要な権限を持っていることを確認します。
Input ボックスに文字列「Hello」を入力し、 Return を押します。
注釈
プログラムでパブリック・エンドポイントにアクセスできます。サンプルコードについては、 Snowflake外部からのパブリック エンドポイントアクセスと認証 をご参照ください。Snowflakeがリクエストをサービスコード内の
ui()
関数にルーティングできるように、コード内のエンドポイント URL に/ui
を追加する必要があることに注意してください。
5: (任意)プログラムでパブリック・エンドポイントにアクセスする¶
前のセクションでは、ウェブブラウザを使ってEchoサービスをテストしました。ブラウザーでは、パブリック・エンドポイント(イングレス・エンドポイント)にアクセスし、サービスが公開しているウェブ UI を使ってリクエストを送信します。このセクションでは、同じパブリック・エンドポイントをプログラムでテストします。
この例では、 キーペア認証 を使用しています。提供されたキーペアを使用して、サンプル・コードはまず JSON Web Token (JWT) を生成し、Snowflake でそのトークンを OAuth トークンと交換します。このコードは、Echoサービスのパブリック・エンドポイントと通信する際に、 OAuth トークンを認証に使用します。
前提条件¶
以下の情報を確認してください。
パブリック エンドポイントのイングレス URL SHOW ENDPOINTS IN SERVICE コマンドを実行し、 URL を取得します。
SHOW ENDPOINTS IN SERVICE echo_service;
あなたのSnowflakeアカウント名 詳細は、 Common Setup: 続行する準備ができていることを認証する を参照してください。
あなたのSnowflakeアカウント URL: これは
<acctname>.snowflakecomputing.com
です。Snowflakeアカウントのユーザー名 これは、 Common Setup: Snowflake オブジェクトの作成 で選択したユーザーです。このユーザーとしてSnowflakeにログインし、プログラム アクセスをテストします。
ロール名: あなたは、共通設定の一部としてロール(
test_role
)を作成しました。ユーザーはアクションを実行するためにこのロールを引き受けます。
設定¶
次の手順に従って、Echoサービスとプログラムで通信してください。提供されているPythonコードを使って、Echoサービスが公開しているパブリックエンドポイントにリクエストを送ります。
コマンドプロンプトでディレクトリを作成し、そこに移動します。
ユーザーのキーペア認証を構成します。
キーペア を生成します。
秘密キーを生成します。演習の手順を簡単にするために、暗号化されていない秘密キーを生成します。暗号化された秘密キーを使うこともできますが、その場合はパスワードの入力が必要になります。
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
作成した秘密キーを参照して公開キー(
rsa_key.pub
)を生成します。openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
ディレクトリに秘密キーと公開キーが生成されていることを確認します。
プログラム・アクセスのテストに使用するユーザーに公開キーを割り当てます。これにより、ユーザーは認証用のキーを指定できます。
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
提供されたサンプルコードをPythonファイルに保存します。
以下のコードを
generateJWT.py
に保存します。# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
以下のコードを
access-via-keypair.py
に保存します。from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
プログラムでサービスエンドポイントにリクエストを送る¶
access-via-keypair.py
Pythonコードを実行し、Echoサービスのパブリックエンドポイントへのイングレスコールを行います。
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
account-identifier
の詳細については、 アカウント識別子 をご参照ください。
認証の仕組み¶
このコードはまず、提供されたキーペアを JWT トークンに変換します。その後、 JWT トークンをSnowflakeに送信し、 OAuth トークンを取得します。最後に、コードは OAuth トークンを使用して Snowflake に接続し、パブリックエンドポイントにアクセスします。具体的にコードは以下を行います。
_get_token(args)
関数を呼び出し、指定したキーペアから JWT トークンを生成します。関数の実装が示されます。def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
は、あなたに提供されるヘルパー・クラスです。このオブジェクトを作成するときに指定するパラメーターについて、以下の点にご注意ください。args.account
とargs.user
パラメーター: JWT トークンにはいくつかのフィールドがあり(トークン形式 を参照)、iss
はフィールドの1つです。このフィールド値には、Snowflakeアカウント名とユーザー名が含まれます。したがって、これらの値をパラメーターとして提供します。timedelta
の2つのパラメーターは以下の情報を提供します。lifetime
は、キーが有効な時間(60分)を指定します。renewal_delay
は、今から何分後に JWT ジェネレーターが JWT を更新すべきかを指定します。
token_exchange()
関数を呼び出して Snowflake に接続し、 JWT トークンを OAuth トークンに交換します。scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
前述のコードは、 OAuth トークンのスコープの JSON 設定を構築し、指定されたロールを使用してアクセスできるパブリック・エンドポイントを構築します。次にこのコードは、図のように JWT トークンを OAuth トークン(トークン交換 を参照)と交換するために、 JSON を渡して Snowflake に POST リクエストを行います。
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
次に、
connect_to_spcs()
関数を呼び出して、Echoサービスのパブリック・エンドポイントに接続します。エンドポイントの URL (https://<ingress-URL>/ui
)と認証用の OAuth トークンを提供します。headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
url
はプログラムに提供したspcs_url
、token
は OAuth トークンです。この例のEchoサービスは、 HTML ページ(前のセクションで説明)を提供します。このサンプル・コードは、レスポンスに HTML を表示するだけです。
6: クリーンアップする¶
チュートリアル2 または チュートリアル3 に進む予定がない場合は、作成した請求対象リソースを削除する必要があります。詳細については、 チュートリアル3 のステップ5をご参照ください。
7: サービスコードを確認する¶
このセクションでは、以下のトピックを取り上げます。
チュートリアル1コードの検証: Echoサービスを実装しているコードファイルを確認します。
サービス関数に対する理解: このセクションでは、このチュートリアルのサービス関数がどのようにサービスとリンクしているかを説明します。
ローカルでのイメージの構築とテスト。このセクションでは、Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストする方法について説明します。
チュートリアル1コードの検証¶
ステップ1でダウンロードしたzipファイルには、以下のファイルが含まれています。
Dockerfile
echo_service.py
templates/basic_ui.html
サービスを作成する際にもサービス仕様を使用します。次のセクションでは、これらのコードコンポーネントがどのように連携してサービスを作成するかを説明します。
echo_service.pyファイル¶
このPythonファイルには、入力テキストを返す(エコーバックする)最小の HTTP サーバーを実装するコードが含まれています。このコードでは主に、Snowflakeサービス関数からのエコーリクエストの処理と、エコーリクエストを送信するためのウェブユーザーインターフェイス(UI)の提供という2つのタスクを実行します。
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
コードで、
echo
関数は、Snowflakeサービス関数がサービスと通信できるようにします。この関数は、@app.post()
装飾を次のように指定します。@app.post("/echo") def echo():
エコーサーバーが HTTP POST リクエストと
/echo
パスを受信すると、サーバーはリクエストをこの関数にルーティングします。この関数は実行され、リクエスト本文の文字列を応答にエコーバックします。Snowflakeサービス関数からの通信をサポートするために、このサーバーは外部関数を実装します。つまり、サーバー実装は、 SQL 関数を処理するために特定の入出力データ形式に従います。これは 外部関数 が使用する 入出力データ形式 と同じです。
コードの
ui
関数セクションは、ウェブフォームを表示し、ウェブフォームから送信されたエコーリクエストを処理します。この関数は、@app.route()
デコレーターを使用して、/ui
に対するリクエストをこの関数で処理するように指定します。@app.route("/ui", methods=["GET", "POST"]) def ui():
Echoサービスは、
echoendpoint
エンドポイントを公開し(サービス仕様を参照)、ウェブ上でサービスと通信できるようにします。ブラウザーで/uiを付加したパブリックエンドポイントの URL をロードすると、ブラウザーはこのパスに対する HTTP GET リクエストを送信し、サーバーはリクエストをこの関数にルーティングします。この関数は実行され、ユーザーが文字列を入力するための単純な HTML フォームを返します。ユーザーが文字列を入力してフォームを送信した後、ブラウザーはこのパスに対する HTTP ポストリクエストを送信し、サーバーはリクエストをこの同じ関数にルーティングします。この関数は実行され、元の文字列を含む HTTP 応答を返します。
readiness_probe
関数は、@app.get()
デコレーターを使用して、/healthcheck
に対するリクエストがこの関数で処理されることを指定します。@app.get("/healthcheck") def readiness_probe():
この関数により、Snowflakeはサービスの準備状況を確認することができます。コンテナーが起動すると、Snowflakeはアプリケーションが動作していることと、サービスがリクエストに対応する準備ができていることを確認します。Snowflakeは、このパスで HTTP GET リクエストを送信し(healthプローブ、readinessプローブとして)、正常なコンテナーだけがトラフィックを提供できるようにします。この関数は何でもできます。
get_logger
関数はログのセットアップに役立ちます。
Dockerfile¶
このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Dockerfileには、FlaskライブラリをDockerコンテナーにインストールする手順が含まれています。 echo_service.py
のコードは、 HTTP のリクエストを処理するためにFlaskライブラリに依存しています。
/template/basic_ui.html¶
Echoサービスは、 echoendpoint
エンドポイントを公開し(サービス仕様を参照)、ウェブ上でサービスと通信できるようにします。ブラウザーで /ui
を付加したパブリックエンドポイント URL をロードすると、Echoサービスはこのフォームを表示します。フォームに文字列を入力して送信すると、サービスはその文字列を HTTP 応答として返します。
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
サービス仕様¶
Snowflakeは、この仕様で提供された情報を使用して、サービスを構成および実行します。
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
サービス仕様で、
containers.image
は、Snowflakeがコンテナーを開始するためのイメージを指定します。オプションの
endpoints
フィールドは、サービスが公開するエンドポイントを指定します。name
は、コンテナーがリッスンしている TCP ネットワークポートのユーザーフレンドリーな名前を指定します。このユーザーフレンドリーなエンドポイント名を使用して、対応するポートにリクエストを送信します。env.SERVER_PORT
は、このポート番号を制御することに注意してください。エンドポイントも
public
として構成されています。これにより、パブリックウェブからこのエンドポイントへのトラフィックが許可されます。
オプションの
containers.env
フィールドは、Snowflakeがコンテナー内のすべてのプロセスに渡す環境変数を上書きする方法を説明するために追加されています。たとえば、サービスコード(echo_service.py
)は、以下のようにデフォルト値で環境変数を読み取ります。CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
仕組みは次のとおりです。
Echoサービスは、リクエスト本文に文字列(例: 「Hello」)を含む HTTP POST リクエストを受信すると、デフォルトで「I said Hello」を返します。このコードでは、環境変数
CHARACTER_NAME
を使用して、「said」の前の単語を決定しています。デフォルトでは、CHARACTER_NAME
は「I」に設定されています。サービス仕様の CHARACTER_NAME デフォルト値は上書きすることができます。たとえば、値を「Bob」に設定すると、Echoサービスは「Bob said Hello」という応答を返します。
同様に、サービス仕様は、サービスがリッスンするポート(SERVER_PORT)をデフォルトのポート8080から8000に上書きします。
readinessProbe
フィールドは、Snowflakeがreadinessプローブに HTTP GET リクエストを送信するために使用できるport
とpath
を識別して、サービスがトラフィックを処理する準備ができていることを確認します。サービスコード(
echo_python.py
)は、以下のように準備確認を実装します。@app.get("/healthcheck") def readiness_probe():
そのため、仕様ファイルにはそれに応じて
container.readinessProbe
フィールドが含まれます。
サービス仕様の詳細については、 サービス仕様リファレンス をご参照ください。
サービス関数に対する理解¶
サービス関数は、サービスと通信する方法の1つです(サービスの使用 を参照)。サービス関数は、サービスエンドポイントに関連付けるユーザー定義関数(UDF)です。サービス関数が実行されると、関連するサービスエンドポイントにリクエストを送信し、応答を受信します。
以下のパラメーターを使用して CREATE FUNCTION コマンドを実行し、以下のサービス関数を作成します。
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
次の点に注意してください。
my_echo_udf
関数は、入力として文字列を取り、文字列を返します。SERVICE プロパティはサービス(
echo_service
)を識別し、 ENDPOINT プロパティはユーザーフレンドリーなエンドポイント名(echoendpoint
)を識別します。AS 「/echo」は、サービスへのパスを指定します。
echo_service.py
では、@app.post
デコレーターがこのパスをecho
関数に関連付けます。
この関数は、指定された SERVICE の特定の ENDPOINT に接続します。この関数を呼び出すと、Snowflakeはサービスコンテナー内の /echo
パスにリクエストを送信します。
ローカルでのイメージの構築とテスト¶
Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナーはスタンドアロンで実行されます(Snowflakeが実行するサービスではありません)。
チュートリアル1のDockerイメージをテストするには、
Dockerイメージを作成するには、Docker CLI で以下のコマンドを実行します。
docker build --rm -t my_service:local .
コードを起動するには、以下のコマンドを実行します。
docker run --rm -p 8080:8080 my_service:local
以下のいずれかの方法で、サービスにエコーリクエストを送信します。
cURL コマンドの使用:
別のターミナルウィンドウで、 cURL を使用し、ポート8080に以下の POST リクエストを送信します。
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
リクエスト本文には2つの文字列が含まれていることに注意してください。この cURL コマンドは、サービスがリッスンしているポート8080に POST リクエストを送信します。データ中の0は、リスト中の入力文字列のインデックスです。Echoサービスは、入力された文字列を次のようにエコーします。
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
ウェブブラウザーの使用:
次の内容¶
これでジョブを実行する チュートリアル2 をテストできます。