자습서 1: Snowpark Container Services 서비스 생성¶
소개¶
공통 설정 을 완료하면 서비스를 만들 준비가 된 것입니다. 이 자습서에서는 입력으로 제공한 텍스트를 단순히 화면에 표시하는 (echo_service
라는) 서비스를 만듭니다. 예를 들어 입력 문자열이 “Hello World”인 경우 이 서비스는 “I said, Hello World”를 반환합니다.
이 자습서는 다음 두 부분으로 구성됩니다.
파트 1: 서비스를 만들고 테스트합니다. 이 자습서에 제공된 코드를 다운로드하고 단계별 지침을 따릅니다.
이 자습서를 위한 서비스 코드를 다운로드합니다.
Snowpark Container Services용 Docker 이미지를 만들어 자기 계정의 리포지토리에 업로드합니다.
서비스 사양 파일과 서비스를 실행할 컴퓨팅 풀을 제공하여 서비스를 생성합니다.
서비스와 통신하기 위한 서비스 함수를 만듭니다.
서비스를 사용합니다. 서비스에 에코 요청을 보내고 응답을 확인합니다.
파트 2: 서비스를 이해합니다. 이 섹션에서는 서비스 코드의 개요를 제공하고 다양한 구성 요소가 어떻게 협력하는지 강조합니다.
1: 서비스 코드 다운로드¶
Echo 서비스를 생성하기 위한 코드(Python 애플리케이션)가 제공됩니다.
SnowparkContainerServices-Tutorials.zip
을 다운로드합니다.이 파일의 압축을 풀면 각 자습서마다 하나의 디렉터리가 포함된 것을 알 수 있습니다.
Tutorial-1
디렉터리에는 다음 파일이 있습니다.Dockerfile
echo_service.py
templates/basic_ui.html
2: 이미지 만들기 및 업로드¶
Snowpark Container Services가 지원하는 linux/amd64 플랫폼의 이미지를 만든 다음, 해당 이미지를 계정의 이미지 리포지토리에 업로드합니다(공통 설정 참조).
리포지토리(리포지토리 URL 및 레지스트리 호스트 이름)에 대한 정보가 있어야 이미지를 만들어 업로드할 수 있습니다. 자세한 내용은 레지스트리 및 리포지토리 를 참조하십시오.
리포지토리에 대한 정보 얻기
리포지토리 URL을 가져오려면 SHOW IMAGE REPOSITORIES SQL 명령을 실행하십시오.
SHOW IMAGE REPOSITORIES;
출력의
repository_url
열은 URL을 제공합니다. 아래에 예가 나와 있습니다.<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
리포지토리 URL의 호스트 이름은 레지스트리 호스트 이름입니다. 아래에 예가 나와 있습니다.
<orgname>-<acctname>.registry.snowflakecomputing.com
이미지를 만들어 리포지토리에 업로드하기
터미널 창을 열고 압축을 푼 파일이 포함된 디렉터리로 변경합니다.
Docker 이미지를 만들려면 Docker CLI를 사용하여 다음
docker build
명령을 실행하십시오. 이 명령은 현재 작업 디렉터리(.
)를 이미지 만들기에 사용할 파일의PATH
로 지정합니다.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
image_name
의 경우my_echo_service_image:latest
를 사용합니다.
예
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
Snowflake 계정의 리포지토리에 이미지를 업로드합니다. Docker가 사용자를 대신하여 리포지토리에 이미지를 업로드하려면 먼저 레지스트리로 Docker를 인증 해야 합니다.
이미지 레지스트리로 Docker를 인증하려면 다음 명령을 실행합니다.
docker login <registry_hostname> -u <username>
username
의 경우 Snowflake 사용자 이름을 지정합니다. Docker가 비밀번호를 묻는 메시지를 표시합니다.
이미지를 업로드하려면 다음 명령을 실행하십시오.
docker push <repository_url>/<image_name>
예
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3: 서비스 만들기¶
이 섹션에서는 서비스를 만들고 서비스와 통신하기 위한 서비스 함수도 만듭니다.
서비스를 만들려면 다음이 필요합니다.
컴퓨팅 풀. Snowflake는 지정된 컴퓨팅 풀에서 서비스를 실행합니다. 공통 설정의 일부로 컴퓨팅 풀을 생성했습니다.
서비스 사양. 이 사양에서는 Snowflake에 서비스 구성과 실행에 필요한 정보를 제공합니다. 자세한 내용은 Snowpark Container Services: 서비스 사용하기 섹션을 참조하십시오. 이 자습서에서는 CREATE SERVICE 명령에 사양을 인라인으로 제공합니다. 또한 사양을 Snowflake 스테이지의 파일에 저장하고 자습서 2에 표시된 대로 CREATE SERVICE 명령에 파일 정보를 제공할 수도 있습니다.
서비스 함수는 서비스와 통신하는 데 사용할 수 있는 방법 중 하나입니다. 서비스 함수는 서비스 엔드포인트와 연결하는 사용자 정의 함수(UDF)입니다. 서비스 함수가 실행되면 서비스 엔드포인트에 요청을 보내고 응답을 받습니다.
컴퓨팅 풀이 준비되었는지, 서비스를 생성할 수 있는 올바른 컨텍스트에 있는지 확인하십시오.
이전에는 공통 설정 단계에서 컨텍스트를 설정했습니다. 이 단계에서 SQL 문의 올바른 컨텍스트에 있는지 확인하려면 다음을 실행하십시오.
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
공통 설정 에서 생성한 컴퓨팅 풀이 준비되었는지 확인하려면
DESCRIBE COMPUTE POOL
을 실행하고state
가ACTIVE
또는IDLE
인지 확인하십시오.state
가STARTING
이면state
가ACTIVE
또는IDLE
로 변경될 때까지 기다려야 합니다.
DESCRIBE COMPUTE POOL tutorial_compute_pool;
서비스를 생성하려면
test_role
을 사용하여 다음 명령을 실행합니다.CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
참고
해당 이름의 서비스가 이미 존재하는 경우 DROP SERVICE 명령을 사용하여 이전에 생성된 서비스를 삭제한 다음 이 서비스를 생성합니다.
방금 만든 서비스에 대한 자세한 정보를 얻으려면 다음 SQL 명령을 실행하십시오. 자세한 내용은 Snowpark Container Services: 서비스 사용하기 섹션을 참조하십시오.
계정의 서비스를 나열하려면 SHOW SERVICES 명령을 실행하십시오.
SHOW SERVICES;
서비스 상태를 확인하려면 시스템 함수 SYSTEM$GET_SERVICE_STATUS를 호출하십시오.
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
서비스에 대한 정보를 얻으려면 DESCRIBE SERVICE 명령을 실행하십시오.
DESCRIBE SERVICE echo_service;
서비스 함수를 생성하려면 다음 명령을 실행합니다.
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
다음 사항을 참고하십시오.
SERVICE 속성은 UDF를
echo_service
서비스와 연결합니다.ENDPOINT 속성은 UDF를 서비스 내의
echoendpoint
엔드포인트와 연결합니다.AS ‘/echo’는 에코 서버의 HTTP 경로를 지정합니다. 서비스 코드(
echo_service.py
)에서 이 경로를 찾을 수 있습니다.
4: 서비스 사용¶
먼저, 이 섹션의 SQL 문에 대한 컨텍스트를 설정하고 다음을 실행합니다.
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
이제 Echo 서비스와 통신할 수 있습니다.
서비스 함수 사용: 쿼리에서 서비스 함수를 호출할 수 있습니다. 예제 서비스 함수(
my_echo_udf
)는 단일 문자열이나 문자열 목록을 입력으로 받을 수 있습니다.예 1.1: 단일 문자열 전달
my_echo_udf
서비스 함수를 호출하려면 다음 SELECT 문을 실행하여 하나의 입력 문자열('hello'
)을 전달합니다.SELECT my_echo_udf('hello!');
Snowflake는 서비스 엔드포인트(
echoendpoint
)에 POST 요청을 보냅니다. 요청을 받으면 서비스는 응답에 입력 문자열을 그대로 표시합니다.+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
예 1.2: 문자열 목록 전달
문자열 목록을 서비스 함수에 전달하면 Snowflake는 이러한 입력 문자열을 일괄 처리하고 일련의 POST 요청을 서비스에 보냅니다. 서비스가 모든 문자열을 처리한 후 Snowflake는 결과를 결합하여 반환합니다.
다음 예에서는 테이블 열을 서비스 함수에 대한 입력으로 전달합니다.
여러 문자열이 포함된 테이블을 만듭니다.
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
테이블이 만들어졌는지 확인합니다.
SELECT * FROM messages;
서비스 함수를 호출하려면 다음 SELECT 문을 실행하여 테이블 행을 입력으로 전달합니다.
SELECT my_echo_udf(message_text) FROM messages;
출력:
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
웹 브라우저 사용: 이 서비스는 엔드포인트를 공개적으로 노출합니다(CREATE SERVICE 명령에 제공된 인라인 사양 참조). 따라서 서비스가 인터넷에 노출하는 웹 UI에 로그인한 후 웹 브라우저에서 서비스에 요청을 보낼 수 있습니다.
서비스가 노출하는 공용 엔드포인트의 URL을 찾습니다.
SHOW ENDPOINTS IN SERVICE echo_service;
응답의
ingress_url
열은 URL을 제공합니다.예
p6bye-myorg-myacct.snowflakecomputing.app
엔드포인트 URL에
/ui
를 추가하고 웹 브라우저에 붙여넣습니다. 그러면 서비스가ui()
함수를 실행하게 됩니다(echo_service.py
참조).엔드포인트 URL에 처음 액세스하면 Snowflake에 로그인하라는 메시지가 표시됩니다. 이 테스트에서는 서비스를 만드는 데 사용한 것과 동일한 사용자를 사용하여 사용자에게 필요한 권한이 있는지 확인하십시오.
Input 상자에 문자열 “Hello”를 입력하고 Return 을 누릅니다.
참고
공개 엔드포인트에 프로그래밍 방식으로 액세스할 수 있습니다. 샘플 코드는 Snowflake 외부의 공용 엔드포인트 액세스 및 인증 섹션을 참조하십시오. 코드의 엔드포인트 URL에
/ui
를 추가해야 Snowflake가 서비스 코드의ui()
함수로 요청을 라우팅할 수 있습니다.
5: (선택 사항) 프로그래밍 방식으로 공개 엔드포인트에 액세스¶
이전 섹션에서는 웹 브라우저를 사용하여 Echo 서비스를 테스트했습니다. 브라우저에서 공용 엔드포인트(수신 엔드포인트)에 액세스하고 서비스가 노출한 웹 UI를 사용하여 요청을 보냈습니다. 이 섹션에서는 동일한 공개 엔드포인트를 프로그래밍 방식으로 테스트합니다.
이 예제에서는 키 페어 인증 을 사용합니다. 샘플 코드는 공급자가 제공한 키 페어를 사용하여 먼저 JSON 웹 토큰(JWT)을 생성한 다음 이 토큰을 Snowflake와 OAuth 토큰으로 교환합니다. 그런 다음 코드는 Echo 서비스 공용 엔드포인트와 통신할 때 OAuth 토큰을 인증에 사용합니다.
전제 조건¶
다음 정보가 있는지 확인합니다.
공용 엔드포인트의 수신 URL. SHOW ENDPOINTS IN SERVICE 명령을 실행하여 URL을 가져옵니다.
SHOW ENDPOINTS IN SERVICE echo_service;
사용자의 Snowflake 계정 이름. 자세한 내용은 공통 설정: 계속할 준비가 되었는지 확인합니다 섹션을 참조하십시오.
사용자의 Snowflake 계정 URL:
<acctname>.snowflakecomputing.com
입니다.Snowflake 계정의 사용자 이름. 공통 설정: Snowflake 오브젝트 만들기 섹션에서 선택한 사용자입니다. 이 사용자로 Snowflake에 로그인하여 프로그래밍 방식의 액세스를 테스트합니다.
역할 이름: 공통 설정의 일부로 역할(
test_role
)을 생성했습니다. 사용자는 이 역할을 맡아 작업을 수행합니다.
설정¶
다음 단계를 따라 Echo 서비스와 프로그래밍적으로 통신합니다. 제공된 Python 코드를 사용하여 Echo 서비스가 노출하는 공개 엔드포인트에 요청을 보냅니다.
명령 프롬프트에서 디렉터리를 생성하고 해당 디렉터리로 이동합니다.
사용자에 대한 키 페어 인증을 구성합니다.
키 페어 를 생성합니다.
개인 키를 생성합니다. 연습 단계를 단순화하려면 암호화되지 않은 개인 키를 생성합니다. 암호화된 개인 키를 사용할 수도 있지만 비밀번호를 입력해야 합니다.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
생성한 개인 키를 참조하여 공개 키(
rsa_key.pub
)를 생성합니다.openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
디렉터리에 개인 키와 공개 키가 생성되었는지 확인합니다.
프로그래밍 방식 액세스를 테스트하는 데 사용하는 사용자에게 공개 키를 할당합니다. 이를 통해 사용자는 인증 키를 지정할 수 있습니다.
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
제공된 샘플 코드를 Python 파일로 저장합니다.
generateJWT.py
의 다음 코드를 저장합니다.# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
access-via-keypair.py
의 다음 코드를 저장합니다.from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
서비스 엔드포인트에 프로그래밍 방식으로 요청 보내기¶
access-via-keypair.py
Python 코드를 실행하여 Echo 서비스 퍼블릭 엔드포인트에 대한 수신 호출을 수행합니다.
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
account-identifier
에 대한 자세한 내용은 계정 식별자 를 참조하십시오.
인증 작동 방식¶
코드는 우선 공급자가 제공한 키 페어를 JWT 토큰으로 변환합니다. 그런 다음 JWT 토큰을 Snowflake로 전송하여 OAuth 토큰을 얻습니다. 마지막으로 코드는 OAuth 토큰을 사용하여 Snowflake에 연결하고 공용 엔드포인트에 액세스합니다. 구체적으로 코드는 다음을 수행합니다.
_get_token(args)
함수를 호출하여 공급자가 제공한 키 페어에서 JWT 토큰을 생성합니다. 함수 구현은 다음과 같습니다.def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
는 사용자에게 제공되는 도우미 클래스입니다. 이 오브젝트를 생성할 때 제공하는 매개 변수에 대해 다음 사항을 참고합니다.args.account
및args.user
매개 변수: JWT 토큰에는 여러 개의 필드가 있으며(토큰 형식 참조),iss
는 그 중 하나입니다. 이 필드 값에는 Snowflake 계정 이름과 사용자 이름이 포함됩니다. 따라서 이러한 값을 매개 변수로 제공합니다.두 개의
timedelta
매개 변수는 다음 정보를 제공합니다.lifetime
은 키가 유효한 시간(60분)을 지정합니다.renewal_delay
는 JWT 생성기가 JWT를 갱신해야 하는 시간(분)을 지정합니다.
token_exchange()
함수를 호출하여 Snowflake에 연결하고 OAuth 토큰을 JWT 토큰으로 교환합니다.scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
앞의 코드는 지정된 역할을 사용하여 액세스할 수 있는 공개 엔드포인트인 OAuth 토큰의 범위를 설정하는 JSON을 구성합니다. 그런 다음 이 코드는 그림과 같이 JSON을 통과하는 Snowflake에 POST를 요청하여 JWT 토큰을 OAuth 토큰으로 교환합니다(토큰 교환 참조).
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
그런 다음 코드는
connect_to_spcs()
함수를 호출하여 Echo 서비스의 공용 엔드포인트에 연결합니다. 엔드포인트의 URL(https://<수신-URL>/ui
)와 인증을 위한 OAuth 토큰을 제공합니다.headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
url
은 사용자가 프로그램에 제공한spcs_url
이고token
은 OAuth 토큰입니다.이 예제의 Echo 서비스는 HTML 페이지를 제공합니다(이전 섹션에서 설명한 대로). 이 샘플 코드는 단순히 응답에 HTML을 출력합니다.
6: 정리¶
자습서 2 또는 자습서 3 을 계속 진행할 계획이 없다면 생성한 청구 가능한 리소스를 제거해야 합니다. 자세한 내용은 자습서 3 의 5단계를 참조하십시오.
7: 서비스 코드 검토¶
이 섹션에서는 다음 주제를 다룹니다.
자습서 1 코드 검토: Echo 서비스를 구현하는 코드 파일을 검토합니다.
서비스 함수 이해하기: 이 섹션에서는 이 자습서의 서비스 함수가 서비스와 연결되는 방식을 설명합니다.
로컬에서 이미지 만들기 및 테스트하기. 이 섹션에서는 Docker 이미지를 Snowflake 계정의 리포지토리에 업로드하기 전에 로컬에서 이미지를 테스트하는 방법을 설명합니다.
자습서 1 코드 검토¶
1단계에서 다운로드한 zip 파일에는 다음 파일이 포함됩니다.
Dockerfile
echo_service.py
templates/basic_ui.html
서비스를 만들 때 서비스 사양도 사용합니다. 다음 섹션에서는 이러한 코드 구성 요소가 함께 작동하여 서비스를 만드는 방법을 설명합니다.
echo_service.py 파일¶
이 Python 파일에는 입력 텍스트를 반환(화면 표시)하는 최소 HTTP 서버를 구현하는 코드가 포함됩니다. 코드는 Snowflake 서비스 함수의 에코 요청을 처리하고 에코 요청 제출을 위한 웹 사용자 인터페이스(UI)를 제공하는 두 가지 작업을 주로 수행합니다.
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
코드에서는 다음 사항이 적용됩니다.
echo
함수를 사용하면 Snowflake 서비스 함수가 서비스와 통신할 수 있습니다. 이 함수는 다음과 같이@app.post()
데코레이션을 지정합니다.@app.post("/echo") def echo():
에코 서버가
/echo
경로가 포함된 HTTP POST 요청을 수신하면 서버가 요청을 이 함수로 라우팅합니다. 이 함수는 응답의 요청 본문에서 문자열을 실행하고 화면에 표시합니다.Snowflake 서비스 함수에서의 통신을 지원하기 위해 이 서버는 외부 함수를 구현합니다. 즉, 서버 구현은 SQL 함수를 제공하기 위해 특정 입력/출력 데이터 형식을 따르며, 이는 외부 함수 에서 사용하는 것과 동일한 입력/출력 데이터 형식 입니다.
코드의
ui
함수 섹션은 웹 양식을 표시하고 웹 양식에서 제출된 에코 요청을 처리합니다. 이 함수는@app.route()
데코레이터를 사용하여/ui
에 대한 요청이 이 함수로 처리되도록 지정합니다.@app.route("/ui", methods=["GET", "POST"]) def ui():
Echo 서비스는
echoendpoint
엔드포인트를 공개적으로 노출하므로(서비스 사양 참조) 웹을 통해 서비스와 통신할 수 있습니다. 브라우저에 /ui가 추가된 공용 엔드포인트의 URL을 로드하면 브라우저가 이 경로에 대한 HTTP GET 요청을 보내고 서버는 요청을 이 함수로 라우팅합니다. 이 함수는 사용자가 문자열을 입력할 수 있는 간단한 HTML 양식을 실행하고 반환합니다.사용자가 문자열을 입력하고 양식을 제출하면 브라우저가 이 경로에 대한 HTTP 게시 요청을 보내고 서버는 요청을 동일한 이 함수로 라우팅합니다. 이 함수는 원래 문자열이 포함된 HTTP 응답을 실행하고 반환합니다.
readiness_probe
함수는@app.get()
데코레이터를 사용하여/healthcheck
에 대한 요청이 이 함수로 처리되도록 지정합니다.@app.get("/healthcheck") def readiness_probe():
이 함수를 사용하면 Snowflake가 서비스 준비 상태를 확인할 수 있습니다. 컨테이너가 시작되면 Snowflake는 애플리케이션이 작동하고 서비스가 요청을 처리할 준비가 되었는지 확인하려고 합니다. Snowflake는 (상태 프로브, 준비 상태 프로브로서) 이 경로를 사용하여 HTTP GET 요청을 보내 정상 컨테이너만 트래픽을 제공하도록 보장합니다. 이 함수로 원하는 것을 무엇이든 할 수 있습니다.
get_logger
함수는 로깅 설정에 도움이 됩니다.
Dockerfile¶
이 파일에는 Docker를 사용하여 이미지를 만드는 모든 명령이 포함되어 있습니다.
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Dockerfile에는 Docker 컨테이너에 Flask 라이브러리를 설치하기 위한 지침이 포함되어 있습니다. echo_service.py
의 코드는 Flask 라이브러리를 사용하여 HTTP 요청을 처리합니다.
/template/basic_ui.html¶
Echo 서비스는 echoendpoint
엔드포인트를 공개적으로 노출하므로(서비스 사양 참조) 웹을 통해 서비스와 통신할 수 있습니다. 브라우저에 /ui
가 추가된 공용 엔드포인트 URL을 로드하면 Echo 서비스가 이 양식을 표시합니다. 양식에 문자열을 입력하고 양식을 제출할 수 있으며, 그러면 서비스가 HTTP 응답으로 문자열을 반환합니다.
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
서비스 사양¶
Snowflake는 사용자가 이 사양에 제공하는 정보를 사용하여 서비스를 구성하고 실행합니다.
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
서비스 사양에서 다음 사항이 적용됩니다.
containers.image
는 Snowflake가 컨테이너를 시작하기 위한 이미지를 지정합니다.선택적
endpoints
필드는 서비스가 노출하는 엔드포인트를 지정합니다.name
은 컨테이너가 수신 대기 중인 TCP 네트워크 포트에 대해 사용자에게 친숙한 이름을 지정합니다. 사용자에게 친숙한 이 엔드포인트 이름을 사용하여 해당 포트에 요청을 보냅니다.env.SERVER_PORT
가 이 포트 번호를 제어합니다.엔드포인트 역시
public
으로 구성됩니다. 따라서 공용 웹에서 이 엔드포인트로의 트래픽이 허용됩니다.
Snowflake가 컨테이너의 모든 프로세스에 전달하는 환경 변수를 재정의할 수 있는 방법을 설명하기 위해 선택적
containers.env
필드를 추가했습니다. 예를 들어 서비스 코드(echo_service.py
)는 다음과 같이 기본값을 사용하여 환경 변수를 읽습니다.CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
이 코드는 다음과 같이 작동합니다.
Echo 서비스는 요청 본문에 문자열(예: “Hello”)이 포함된 HTTP POST 요청을 수신하면 기본적으로 “I said Hello”를 반환합니다. 코드에서는
CHARACTER_NAME
환경 변수를 사용하여 “said” 앞의 단어를 결정합니다. 기본적으로CHARACTER_NAME
은 “I”로 설정됩니다.서비스 사양의 CHARACTER_NAME 기본값을 덮어쓸 수 있습니다. 예를 들어 이 값을 “Bob”으로 설정하면 Echo 서비스는 “Bob said Hello”라는 응답을 반환합니다.
마찬가지로, 서비스 사양은 서비스가 8000을 수신 대기하는 포트(SERVER_PORT)를 재정의하여 기본 포트 8080을 재정의합니다.
readinessProbe
필드는 Snowflake가 준비 상태 프로브에 HTTP GET 요청을 보내 서비스가 트래픽을 처리할 준비가 되었는지 확인하는 데 사용할 수 있는port
및path
를 식별합니다.서비스 코드(
echo_python.py
)는 다음과 같이 준비 상태 프로브를 구현합니다.@app.get("/healthcheck") def readiness_probe():
따라서 사양 파일에는 이에 따라
container.readinessProbe
필드가 포함됩니다.
서비스 사양에 대한 자세한 내용은 서비스 사양 참조 섹션을 참조하십시오.
서비스 함수 이해하기¶
서비스 함수는 서비스와 통신하는 방법 중 하나입니다(서비스 사용하기 참조). 서비스 함수는 서비스 엔드포인트와 연결하는 사용자 정의 함수(UDF)입니다. 서비스 함수가 실행되면 연결된 서비스 엔드포인트에 요청을 보내고 응답을 받습니다.
다음 매개 변수와 함께 CREATE FUNCTION 명령을 실행하여 다음 서비스 함수를 생성합니다.
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
다음 사항을 참고하십시오.
my_echo_udf
함수는 문자열을 입력값으로 받아 문자열을 반환합니다.SERVICE 속성은 서비스(
echo_service
)를 식별하고, ENDPOINT 속성은 사용자에게 친숙한 엔드포인트 이름(echoendpoint
)을 식별합니다.AS ‘/echo’는 서비스 경로를 지정합니다.
echo_service.py
에서@app.post
데코레이터는 이 경로를echo
함수와 연결합니다.
이 함수는 지정된 SERVICE의 특정 ENDPOINT와 연결합니다. 이 함수를 호출하면 Snowflake가 서비스 컨테이너 내부의 /echo
경로로 요청을 보냅니다.
로컬에서 이미지 만들기 및 테스트하기¶
Docker 이미지를 Snowflake 계정의 리포지토리에 업로드하기 전에 로컬에서 테스트할 수 있습니다. 로컬 테스트에서는 컨테이너가 독립형으로 실행됩니다(Snowflake가 실행하는 서비스가 아님).
자습서 1 Docker 이미지를 테스트하는 방법은 다음과 같습니다.
Docker 이미지를 생성하려면 Docker CLI에서 다음 명령을 실행하십시오.
docker build --rm -t my_service:local .
코드를 실행하려면 다음 명령을 실행하십시오.
docker run --rm -p 8080:8080 my_service:local
다음 방법 중 하나를 사용하여 서비스에 에코 요청을 보냅니다.
cURL 명령 사용:
다른 터미널 창에서 cURL을 사용하여 다음 POST 요청을 포트 8080에 보냅니다.
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
요청 본문에는 다음 두 문자열이 포함됩니다. 이 cURL 명령은 서비스가 수신 대기 중인 포트 8080에 POST 요청을 보냅니다. 데이터에 있는 0은 목록에 있는 입력 문자열의 인덱스입니다. Echo 서비스는 다음과 같이 입력 문자열을 응답으로 표시합니다.
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
웹 브라우저 사용:
다음에는 무엇을 해야 합니까?¶
이제 작업을 실행하는 자습서 2 를 테스트할 수 있습니다.