Tutorial 1: Criar um serviço Snowpark Container Services¶
Introdução¶
Após concluir a configuração comum, você estará pronto para criar um serviço. Neste tutorial, você cria um serviço (denominado echo_service
) que simplesmente retorna o texto fornecido como entrada. Por exemplo, se a cadeia de caracteres de entrada for “Hello World”, o serviço retornará “Eu disse, Hello World”.
Existem duas partes neste tutorial:
Parte 1: Criar e testar um serviço. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:
Baixe o código de serviço deste tutorial.
Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.
Crie um serviço, fornecendo o arquivo de especificação de serviço e o pool de computação no qual executar o serviço.
Crie uma função de serviço para se comunicar com o serviço.
Use o serviço. Você envia solicitações de eco ao serviço e verifica a resposta.
Parte 2: Entender o serviço. Esta seção fornece uma visão geral do código de serviço e destaca como diferentes componentes colaboram.
1: Baixe o código de serviço¶
O código (um aplicativo Python) é fornecido para criar o serviço Echo.
Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório
Tutorial-1
possui os seguintes arquivos:Dockerfile
echo_service.py
templates/basic_ui.html
2: Crie uma imagem e carregue¶
Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).
Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.
Obter informações sobre o repositório
Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.
SHOW IMAGE REPOSITORIES;
A coluna
repository_url
na saída fornece o URL. Abaixo um exemplo:<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
O nome do host no URL do repositório é o nome do host de registro. Abaixo um exemplo:
<orgname>-<acctname>.registry.snowflakecomputing.com
Criar a imagem e carregá-la no repositório
Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.
Para criar uma imagem do Docker, execute o seguinte comando
docker build
usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.
) comoPATH
para arquivos a serem usados na construção da imagem.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Para
image_name
, usemy_echo_service_image:latest
:
Exemplo
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
Faça upload da imagem para o repositório em sua conta Snowflake. Para que o Docker carregue uma imagem em seu nome para seu repositório, é necessário primeiro autenticar o Docker com o registro.
Para autenticar o Docker com o registro de imagem, execute o seguinte comando.
docker login <registry_hostname> -u <username>
Para
username
, especifique seu nome de usuário do Snowflake. O Docker solicitará sua senha.
Para fazer upload da imagem, execute o seguinte comando:
docker push <repository_url>/<image_name>
Exemplo
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3: Criar um serviço¶
Nesta seção você cria um serviço e também uma função de serviço para se comunicar com o serviço.
Para criar um serviço, você precisa do seguinte:
Um pool de computação. Snowflake executa seu serviço no pool de computação especificado. Você criou um pool de computação como parte da configuração comum.
Uma especificação de serviço. Esta especificação fornece ao Snowflake as informações necessárias para configurar e executar seu serviço. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços. Neste tutorial, você fornece a especificação inline, no comando CREATE SERVICE. Você também pode salvar a especificação em um arquivo no estágio Snowflake e fornecer informações do arquivo no comando CREATE SERVICE, conforme mostrado no Tutorial 2.
Uma função de serviço é um dos métodos disponíveis para comunicação com seu serviço. Uma função de serviço é uma função definida pelo usuário (UDF) que você associa ao ponto de extremidade do serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço e recebe uma resposta.
Verifique se o pool de computação está pronto e se você está no contexto certo para criar o serviço.
Anteriormente, você definia o contexto na etapa Configuração comum. Para garantir que você esteja no contexto correto para as instruções SQL nesta etapa, execute o seguinte:
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
Para garantir que o pool de computação criado na configuração comum esteja pronto, execute
DESCRIBE COMPUTE POOL
e verifique sestate
éACTIVE
ouIDLE
. Sestate
forSTARTING
, será necessário aguardar até questate
mude paraACTIVE
ouIDLE
.
DESCRIBE COMPUTE POOL tutorial_compute_pool;
Para criar o serviço, execute o seguinte comando usando
test_role
:CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Nota
Se já existir um serviço com esse nome, use o comando DROP SERVICE para excluir o serviço criado anteriormente e, em seguida, crie esse serviço.
Execute os seguintes comandos SQL para obter informações detalhadas sobre o serviço que você acabou de criar. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços.
Para listar os serviços da sua conta, execute o comando SHOW SERVICES:
SHOW SERVICES;
Para obter o status do seu serviço, chame a função do sistema SYSTEM$GET_SERVICE_STATUS:
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
Para obter informações sobre seu serviço, execute o comando DESCRIBE SERVICE:
DESCRIBE SERVICE echo_service;
Para criar uma função de serviço, execute o seguinte comando:
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
Observe o seguinte:
A propriedade SERVICE associa a UDF ao serviço
echo_service
.A propriedade ENDPOINT associa a UDF ao ponto de extremidade
echoendpoint
dentro do serviço.AS “/echo” especifica o caminho HTTP para o servidor de eco. Você pode encontrar esse caminho no código de serviço (
echo_service.py
).
4: Use o serviço¶
Primeiro, configure o contexto para as instruções SQL nesta seção e execute o seguinte:
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Agora você pode se comunicar com o serviço Echo.
Usando uma função de serviço: você pode invocar a função de serviço em uma consulta. A função de serviço de exemplo (
my_echo_udf
) pode receber uma única cadeia de caracteres ou uma lista de cadeias de caracteres como entrada.Exemplo 1.1: passe uma única cadeia de caracteres
Para chamar a função de serviço
my_echo_udf
, execute a seguinte instrução SELECT, passando uma cadeia de caracteres de entrada ('hello'
):SELECT my_echo_udf('hello!');
Snowflake envia uma solicitação POST ao ponto de extremidade do serviço (
echoendpoint
). Ao receber a solicitação, o serviço ecoa a cadeia de caracteres de entrada na resposta.+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Exemplo 1.2: passe uma lista de cadeias de caracteres
Quando você passa uma lista de cadeias de caracteres para a função de serviço, o Snowflake agrupa essas cadeias de caracteres de entrada e envia uma série de solicitações POST para o serviço. Depois que o serviço processa todas as cadeias de caracteres, o Snowflake combina os resultados e os retorna.
O exemplo a seguir passa uma coluna da tabela como entrada para a função de serviço.
Crie uma tabela com várias cadeias de caracteres:
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
Verifique se a tabela foi criada:
SELECT * FROM messages;
Para chamar a função de serviço, execute a seguinte instrução SELECT, passando as linhas da tabela como entrada:
SELECT my_echo_udf(message_text) FROM messages;
Saída:
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
Usando um navegador da Web: o serviço expõe o ponto de extremidade publicamente (consulte a especificação embutida fornecida no comando CREATE SERVICE). Portanto, você pode fazer login em uma UI da web que o serviço expõe à Internet e, em seguida, enviar solicitações ao serviço a partir de um navegador da web.
Encontre o URL do ponto de extremidade público que o serviço expõe:
SHOW ENDPOINTS IN SERVICE echo_service;
A coluna
ingress_url
na resposta fornece o URL.Exemplo
p6bye-myorg-myacct.snowflakecomputing.app
Anexe
/ui
ao URL do ponto de extremidade e cole-o no navegador da web. Isso faz com que o serviço execute a funçãoui()
(consulteecho_service.py
).Observe que na primeira vez que você acessar o URL do ponto de extremidade, será solicitado a fazer login no Snowflake. Para este teste, use o mesmo usuário usado para criar o serviço para garantir que o usuário tenha os privilégios necessários.
Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.
Nota
É possível acessar o ponto de extremidade público programaticamente. Para obter um código de amostra, consulte Acesso de ponto de extremidade público de fora do Snowflake e autenticação. Observe que é necessário anexar
/ui
ao URL do ponto de extremidade no código para que o Snowflake possa rotear a solicitação para a funçãoui()
no código de serviço.
5: (Opcional) Acesse o ponto de extremidade público programaticamente¶
Na seção anterior, você testou o serviço Echo usando um navegador da Web. No navegador, você acessou o ponto de extremidade público (ponto de extremidade de entrada) e enviou solicitações usando a UI da Web que o serviço expôs. Nesta seção, você testa o mesmo ponto de extremidade público programaticamente.
O exemplo usa autenticação de par de chaves. Usando o par de chaves fornecido, o código de amostra primeiro gera um JSON Web Token (JWT) e depois troca o token com o Snowflake por um token OAuth. O código então usa o token OAuth para autenticar ao se comunicar com o ponto de extremidade público do serviço Echo.
Pré-requisitos¶
Certifique-se de ter as seguintes informações:
URL de entrada do ponto de extremidade público. Execute o comando SHOW ENDPOINTS IN SERVICEpara obter o URL:
SHOW ENDPOINTS IN SERVICE echo_service;
Nome de sua conta Snowflake. Para obter mais informações, consulte Configuração comum: Verifique se você está pronto para continuar.
URL de sua conta Snowflake: É
<acctname >.snowflakecomputing.com
.Nome de usuário na conta Snowflake. Este é o usuário escolhido em Configuração comum: Criação de objetos Snowflake. Você faz login no Snowflake como este usuário e testa o acesso programático.
Nome da função: você criou uma função (
test_role
) como parte da configuração comum. O usuário assume essa função para executar ações.
Configuração¶
Siga as etapas para se comunicar com o serviço Echo programaticamente. Como usar o código Python fornecido, você envia solicitações ao ponto de extremidade público que o serviço Echo expõe.
Em um prompt de comando, crie um diretório e navegue até ele.
Configure a autenticação do par de chaves para o usuário.
Gerar um par de chaves:
Gere uma chave privada. Para simplificar as etapas do exercício, você gera uma chave privada não criptografada. Você também pode usar uma chave privada criptografada, mas será necessário digitar a senha.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
Gere uma chave pública (
rsa_key.pub
) referenciando a chave privada que você criou.openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Verifique se você tem a chave privada e a chave pública geradas no diretório.
Atribua a chave pública ao usuário que você está usando para testar o acesso programático. Isso permite que o usuário especifique a chave para autenticação.
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
Salve o código de amostra fornecido em arquivos Python.
Salve o código a seguir em
generateJWT.py
.# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
Salve o código a seguir em
access-via-keypair.py
.from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
Como enviar de solicitações ao ponto de extremidade do servidor programaticamente¶
Execute o código Python access-via-keypair.py
para fazer a chamada de entrada para o ponto de extremidade público do serviço Echo.
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
Para obter mais informações sobre account-identifier
, consulte Identificadores de conta.
Como funciona a autenticação¶
O código primeiro converte o par de chaves fornecido em um token JWT. Em seguida, ele envia o token JWT para o Snowflake para obter um token OAuth. Por fim, o código usa o token OAuth para se conectar ao Snowflake e acessar o ponto de extremidade público. Especificamente, o código faz o seguinte:
Chama a função
_get_token(args)
para gerar um token JWT a partir do par de chaves fornecido. A implementação da função é mostrada:def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
é uma classe auxiliar fornecida a você. Observe o seguinte sobre os parâmetros que você fornece ao criar este objeto:Parâmetros
args.account
eargs.user
: Um token JWT tem vários campos (consulte o formato de token);iss
é um dos campos. Este valor de campo inclui o nome da conta Snowflake e um nome de usuário. Portanto, você fornece esses valores como parâmetros.Os dois parâmetros
timedelta
fornecem as seguintes informações:lifetime
especifica o número de minutos durante os quais a chave será válida (60 minutos).renewal_delay
especifica o número de minutos a partir de agora após o qual o gerador JWT deve renovar o JWT.
Chama a função
token_exchange()
para se conectar ao Snowflake e trocar o token JWT por um token OAuth.scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
O código anterior constrói uma configuração JSON do escopo para o token OAuth, o ponto de extremidade público que pode ser acessado usando a função especificada. Este código então faz uma solicitação POST ao Snowflake passando o JSON para trocar o token JWT por um token OAuth (consulte Troca de tokens) conforme mostrado:
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
O código então chama a função
connect_to_spcs()
para se conectar ao ponto de extremidade público do serviço Echo. Ele fornece o URL (https://<URL de entrada>/ui
) do ponto de extremidade e o token OAuth para autenticação.headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
O
url
é ospcs_url
que você forneceu ao programa, e otoken
é o token OAuth.O serviço Echo neste exemplo atende uma página HTML (conforme explicado na seção anterior). Este código de amostra simplesmente imprime o HTML na resposta.
6: Limpeza¶
Se você não planeja continuar com o Tutorial 2 ou o Tutorial 3, remova os recursos faturáveis que você criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.
7: Análise do código de serviço¶
Esta seção cobre os seguintes tópicos:
Análise do código do tutorial 1: analise os arquivos de código que implementam o serviço Echo.
Como entender a função de serviço: esta seção explica como a função de serviço neste tutorial está vinculada ao serviço.
Criação e teste de uma imagem localmente. A seção fornece uma explicação de como você pode testar localmente a imagem do Docker antes de carregá-la em um repositório em sua conta Snowflake.
Análise do código do tutorial 1¶
O arquivo zip baixado na etapa 1 inclui os seguintes arquivos:
Dockerfile
echo_service.py
templates/basic_ui.html
Você também usa a especificação de serviço ao criar o serviço. A seção a seguir explica como esses componentes de código funcionam juntos para criar o serviço.
Arquivo echo_service.py¶
Este arquivo Python contém o código que implementa um servidor HTTP mínimo que retorna (ecoa) o texto de entrada. O código executa principalmente duas tarefas: manipular solicitações de eco das funções de serviço do Snowflake e fornecer uma interface de usuário da web (UI) para enviar solicitações de eco.
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
No código:
A função
echo
permite que uma função de serviço Snowflake se comunique com o serviço. Esta função especifica a decoração@app.post()
conforme mostrado:@app.post("/echo") def echo():
Quando o servidor de eco recebe sua solicitação HTTP POST com o caminho
/echo
, o servidor encaminha a solicitação para esta função. A função é executada e ecoa as cadeias de caracteres do corpo da solicitação na resposta.Para oferecer suporte à comunicação de uma função de serviço Snowflake, este servidor implementa as funções externas. Ou seja, a implementação do servidor segue um determinado formato de dados de entrada/saída para servir uma função SQL, e este é o mesmo formato de dados de entrada/saída usado pelas funções externas .
A seção de função
ui
do código exibe um formulário da web e lida com solicitações de eco enviadas do formulário da web. Esta função usa o decorador@app.route()
para especificar que as solicitações de/ui
serão tratadas por esta função:@app.route("/ui", methods=["GET", "POST"]) def ui():
O serviço Echo expõe o ponto de extremidade
echoendpoint
publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui anexado em seu navegador, o navegador envia uma solicitação HTTP GET para esse caminho e o servidor encaminha a solicitação para essa função. A função é executada e retorna um formulário HTML simples para o usuário inserir uma cadeia de caracteres.Depois que o usuário insere uma cadeia de caracteres e envia o formulário, o navegador envia uma solicitação HTTP para esse caminho e o servidor encaminha a solicitação para essa mesma função. A função é executada e retorna uma resposta HTTP contendo a cadeia de caracteres original.
A função
readiness_probe
usa o decorador@app.get()
para especificar que as solicitações de/healthcheck
serão tratadas por esta função:@app.get("/healthcheck") def readiness_probe():
Esta função permite que o Snowflake verifique a prontidão do serviço. Quando o contêiner é iniciado, o Snowflake deseja confirmar se o aplicativo está funcionando e se o serviço está pronto para atender às solicitações. O Snowflake envia uma solicitação HTTP GET com esse caminho (como uma análise de integridade, análise de prontidão) para garantir que apenas contêineres íntegros tenham tráfego. A função pode fazer o que você quiser.
A função
get_logger
ajuda a configurar a geração de registros.
Arquivo Docker¶
Este arquivo contém todos os comandos para criar uma imagem usando Docker.
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
O Dockerfile contém instruções para instalar a biblioteca Flask no contêiner Docker. O código em echo_service.py
depende da biblioteca Flask para lidar com solicitações HTTP.
/template/basic_ui.html¶
O serviço Echo expõe o ponto de extremidade echoendpoint
publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui
anexado em seu navegador, o serviço Echo exibe este formulário. Você pode inserir uma cadeia de caracteres no formulário e enviá-lo, e o serviço retornará a cadeia de caracteres em uma resposta HTTP.
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
Especificação de serviço¶
Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu serviço.
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
Na especificação do serviço:
O
containers.image
especifica a imagem do Snowflake para iniciar um contêiner.O campo opcional
endpoints
especifica o ponto de extremidade que o serviço expõe.O
name
especifica um nome amigável para a porta de rede TCP na qual o contêiner está escutando. Você usa esse nome de ponto de extremidade amigável para enviar solicitações para a porta correspondente. Observe queenv.SERVER_PORT
controla esse número de porta.O ponto de extremidade também está configurado como
public
. Isto permite o tráfego para este ponto de extremidade a partir da web pública.
O campo opcional
containers.env
é adicionado para ilustrar como você pode substituir variáveis de ambiente que o Snowflake passa para todos os processos em seu contêiner. Por exemplo, o código de serviço (echo_service.py
) lê as variáveis de ambiente com valores padrão conforme mostrado:CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
Funciona da seguinte maneira:
Quando o serviço Echo recebe uma solicitação HTTP POST com uma cadeia de caracteres (por exemplo, “Olá”) no corpo da solicitação, o serviço retorna “Eu disse Olá” por padrão. O código usa a variável de ambiente
CHARACTER_NAME
para determinar a palavra antes de «disse». Por padrão,CHARACTER_NAME
é definido como “eu».Você pode substituir o valor padrão CHARACTER_NAME na especificação de serviço. Por exemplo, se você definir o valor como «Bob»; o serviço Echo retorna uma resposta «Bob disse Olá».
Da mesma forma, a especificação do serviço substitui a porta (SERVER_PORT) que o serviço escuta por 8000, substituindo a porta padrão 8080.
O campo
readinessProbe
identificaport
epath
que o Snowflake pode usar para enviar uma solicitação HTTP GET à análise de prontidão para verificar se o serviço está pronto para lidar com o tráfego.O código de serviço (
echo_python.py
) implementa a sonda de prontidão da seguinte forma:@app.get("/healthcheck") def readiness_probe():
Portanto, o arquivo de especificação inclui o campo
container.readinessProbe
adequadamente.
Para obter mais informações sobre especificações de serviço, consulte Referência de especificação de serviço.
Como entender a função de serviço¶
Uma função de serviço é um dos métodos de comunicação com seu serviço (consulte Como usar um serviço). Uma função de serviço é uma função definida pelo usuário (UDF) que você associa a um ponto de extremidade de serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço associado e recebe uma resposta.
Você cria a seguinte função de serviço executando o comando CREATE FUNCTION com os seguintes parâmetros:
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
Observe o seguinte:
A função
my_echo_udf
usa uma cadeia de caracteres como entrada e retorna uma cadeia de caracteres.A propriedade SERVICE identifica o serviço (
echo_service
) e a propriedade ENDPOINT identifica o nome do ponto de extremidade amigável (echoendpoint
).O AS “/echo” especifica o caminho para o serviço. Em
echo_service.py
, o decorador@app.post
associa este caminho à funçãoecho
.
Esta função se conecta ao ENDPOINT específico do SERVICE especificado. Quando você invoca esta função, o Snowflake envia uma solicitação para o caminho /echo
dentro do contêiner de serviço.
Criação e teste de uma imagem localmente¶
Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um serviço executado pelo Snowflake).
Para testar a imagem do Docker do Tutorial 1:
Para criar uma imagem do Docker, no Docker CLI, execute o seguinte comando:
docker build --rm -t my_service:local .
Para iniciar seu código, execute o seguinte comando:
docker run --rm -p 8080:8080 my_service:local
Envie uma solicitação de eco ao serviço usando um dos seguintes métodos:
Usando o comando cURL:
Em outra janela do terminal, usando cURL, envie a seguinte solicitação POST para a porta 8080:
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
Observe que o corpo da solicitação inclui duas cadeias de caracteres. Este comando cURL envia uma solicitação POST para a porta 8080 na qual o serviço está escutando. O 0 nos dados é o índice da cadeia de caracteres de entrada na lista. O serviço Echo ecoa as cadeias de caracteres de entrada na resposta, conforme mostrado:
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
Usando um navegador da web:
No seu navegador, no mesmo computador, abra
http://localhost:8080/ui
.Isso envia uma solicitação GET para a porta 8080, na qual o serviço está escutando. O serviço executa a função
ui()
, que renderiza um formulário HTML conforme mostrado:Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.
Qual é o próximo passo?¶
Agora você pode testar o Tutorial 2 que executa um trabalho.