Tutorial 1: Criar um serviço Snowpark Container Services

Introdução

Após concluir a configuração comum, você estará pronto para criar um serviço. Neste tutorial, você cria um serviço (denominado echo_service) que simplesmente retorna o texto fornecido como entrada. Por exemplo, se a cadeia de caracteres de entrada for “Hello World”, o serviço retornará “Eu disse, Hello World”.

Existem duas partes neste tutorial:

Parte 1: Criar e testar um serviço. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:

  1. Baixe o código de serviço deste tutorial.

  2. Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.

  3. Crie um serviço, fornecendo o arquivo de especificação de serviço e o pool de computação no qual executar o serviço.

  4. Crie uma função de serviço para se comunicar com o serviço.

  5. Use o serviço. Você envia solicitações de eco ao serviço e verifica a resposta.

Parte 2: Entender o serviço. Esta seção fornece uma visão geral do código de serviço e destaca como diferentes componentes colaboram.

1: Baixe o código de serviço

O código (um aplicativo Python) é fornecido para criar o serviço Echo.

  1. Baixe SnowparkContainerServices-Tutorials.zip.

  2. Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório Tutorial-1 possui os seguintes arquivos:

    • Dockerfile

    • echo_service.py

    • templates/basic_ui.html

2: Crie uma imagem e carregue

Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).

Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.

Obter informações sobre o repositório

  1. Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • A coluna repository_url na saída fornece o URL. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • O nome do host no URL do repositório é o nome do host de registro. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Criar a imagem e carregá-la no repositório

  1. Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.

  2. Para criar uma imagem do Docker, execute o seguinte comando docker build usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.) como PATH para arquivos a serem usados na construção da imagem.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Para image_name, use my_echo_service_image:latest:

    Exemplo

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
    
    Copy
  3. Faça upload da imagem para o repositório em sua conta Snowflake. Para que o Docker carregue uma imagem em seu nome para seu repositório, é necessário primeiro autenticar o Docker com o registro.

    1. Para autenticar o Docker com o registro de imagem, execute o seguinte comando.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Para username, especifique seu nome de usuário do Snowflake. O Docker solicitará sua senha.

    2. Para fazer upload da imagem, execute o seguinte comando:

      docker push <repository_url>/<image_name>
      
      Copy

      Exemplo

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
      
      Copy

3: Criar um serviço

Nesta seção você cria um serviço e também uma função de serviço para se comunicar com o serviço.

Para criar um serviço, você precisa do seguinte:

  • Um pool de computação. Snowflake executa seu serviço no pool de computação especificado. Você criou um pool de computação como parte da configuração comum.

  • Uma especificação de serviço. Esta especificação fornece ao Snowflake as informações necessárias para configurar e executar seu serviço. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços. Neste tutorial, você fornece a especificação inline, no comando CREATE SERVICE. Você também pode salvar a especificação em um arquivo no estágio Snowflake e fornecer informações do arquivo no comando CREATE SERVICE, conforme mostrado no Tutorial 2.

Uma função de serviço é um dos métodos disponíveis para comunicação com seu serviço. Uma função de serviço é uma função definida pelo usuário (UDF) que você associa ao ponto de extremidade do serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço e recebe uma resposta.

  1. Verifique se o pool de computação está pronto e se você está no contexto certo para criar o serviço.

    1. Anteriormente, você definia o contexto na etapa Configuração comum. Para garantir que você esteja no contexto correto para as instruções SQL nesta etapa, execute o seguinte:

    USE ROLE test_role;
    USE DATABASE tutorial_db;
    USE SCHEMA data_schema;
    USE WAREHOUSE tutorial_warehouse;
    
    Copy
    1. Para garantir que o pool de computação criado na configuração comum esteja pronto, execute DESCRIBE COMPUTE POOL e verifique se state é ACTIVE ou IDLE. Se state for STARTING, será necessário aguardar até que state mude para ACTIVE ou IDLE.

    DESCRIBE COMPUTE POOL tutorial_compute_pool;
    
    Copy
  2. Para criar o serviço, execute o seguinte comando usando test_role:

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
          $$
       MIN_INSTANCES=1
       MAX_INSTANCES=1;
    
    Copy

    Nota

    Se já existir um serviço com esse nome, use o comando DROP SERVICE para excluir o serviço criado anteriormente e, em seguida, crie esse serviço.

  3. Execute os seguintes comandos SQL para obter informações detalhadas sobre o serviço que você acabou de criar. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços.

    • Para listar os serviços da sua conta, execute o comando SHOW SERVICES:

      SHOW SERVICES;
      
      Copy
    • Para obter o status do seu serviço, chame a função do sistema SYSTEM$GET_SERVICE_STATUS:

      SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
      
      Copy
    • Para obter informações sobre seu serviço, execute o comando DESCRIBE SERVICE:

      DESCRIBE SERVICE echo_service;
      
      Copy
  4. Para criar uma função de serviço, execute o seguinte comando:

    CREATE FUNCTION my_echo_udf (InputText varchar)
      RETURNS varchar
      SERVICE=echo_service
      ENDPOINT=echoendpoint
      AS '/echo';
    
    Copy

    Observe o seguinte:

    • A propriedade SERVICE associa a UDF ao serviço echo_service.

    • A propriedade ENDPOINT associa a UDF ao ponto de extremidade echoendpoint dentro do serviço.

    • AS “/echo” especifica o caminho HTTP para o servidor de eco. Você pode encontrar esse caminho no código de serviço (echo_service.py).

4: Use o serviço

Primeiro, configure o contexto para as instruções SQL nesta seção e execute o seguinte:

USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Copy

Agora você pode se comunicar com o serviço Echo.

  1. Usando uma função de serviço: você pode invocar a função de serviço em uma consulta. A função de serviço de exemplo (my_echo_udf) pode receber uma única cadeia de caracteres ou uma lista de cadeias de caracteres como entrada.

    Exemplo 1.1: passe uma única cadeia de caracteres

    • Para chamar a função de serviço my_echo_udf, execute a seguinte instrução SELECT, passando uma cadeia de caracteres de entrada ('hello'):

      SELECT my_echo_udf('hello!');
      
      Copy

      Snowflake envia uma solicitação POST ao ponto de extremidade do serviço (echoendpoint). Ao receber a solicitação, o serviço ecoa a cadeia de caracteres de entrada na resposta.

      +--------------------------+
      | **MY_ECHO_UDF('HELLO!')**|
      |------------------------- |
      | Bob said hello!          |
      +--------------------------+
      

    Exemplo 1.2: passe uma lista de cadeias de caracteres

    Quando você passa uma lista de cadeias de caracteres para a função de serviço, o Snowflake agrupa essas cadeias de caracteres de entrada e envia uma série de solicitações POST para o serviço. Depois que o serviço processa todas as cadeias de caracteres, o Snowflake combina os resultados e os retorna.

    O exemplo a seguir passa uma coluna da tabela como entrada para a função de serviço.

    1. Crie uma tabela com várias cadeias de caracteres:

      CREATE TABLE messages (message_text VARCHAR)
        AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
      
      Copy
    2. Verifique se a tabela foi criada:

      SELECT * FROM messages;
      
      Copy
    3. Para chamar a função de serviço, execute a seguinte instrução SELECT, passando as linhas da tabela como entrada:

      SELECT my_echo_udf(message_text) FROM messages;
      
      Copy

      Saída:

      +---------------------------+
      | MY_ECHO_UDF(MESSAGE_TEXT) |
      |---------------------------|
      | Bob said Thank you        |
      | Bob said Hello            |
      | Bob said Hello World      |
      +---------------------------+
      
  2. Usando um navegador da Web: o serviço expõe o ponto de extremidade publicamente (consulte a especificação embutida fornecida no comando CREATE SERVICE). Portanto, você pode fazer login em uma UI da web que o serviço expõe à Internet e, em seguida, enviar solicitações ao serviço a partir de um navegador da web.

    1. Encontre o URL do ponto de extremidade público que o serviço expõe:

      SHOW ENDPOINTS IN SERVICE echo_service;
      
      Copy

      A coluna ingress_url na resposta fornece o URL.

      Exemplo

      p6bye-myorg-myacct.snowflakecomputing.app
      
    2. Anexe /ui ao URL do ponto de extremidade e cole-o no navegador da web. Isso faz com que o serviço execute a função ui() (consulte echo_service.py).

      Observe que na primeira vez que você acessar o URL do ponto de extremidade, será solicitado a fazer login no Snowflake. Para este teste, use o mesmo usuário usado para criar o serviço para garantir que o usuário tenha os privilégios necessários.

      Formulário da web para comunicação com o serviço Echo.
    3. Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.

      Formulário da Web mostrando a resposta do serviço Echo.

    Nota

    É possível acessar o ponto de extremidade público programaticamente. Para obter um código de amostra, consulte Acesso de ponto de extremidade público de fora do Snowflake e autenticação. Observe que é necessário anexar /ui ao URL do ponto de extremidade no código para que o Snowflake possa rotear a solicitação para a função ui() no código de serviço.

5: (Opcional) Acesse o ponto de extremidade público programaticamente

Na seção anterior, você testou o serviço Echo usando um navegador da Web. No navegador, você acessou o ponto de extremidade público (ponto de extremidade de entrada) e enviou solicitações usando a UI da Web que o serviço expôs. Nesta seção, você testa o mesmo ponto de extremidade público programaticamente.

O exemplo usa autenticação de par de chaves. Usando o par de chaves fornecido, o código de amostra primeiro gera um JSON Web Token (JWT) e depois troca o token com o Snowflake por um token OAuth. O código então usa o token OAuth para autenticar ao se comunicar com o ponto de extremidade público do serviço Echo.

Pré-requisitos

Certifique-se de ter as seguintes informações:

  • URL de entrada do ponto de extremidade público. Execute o comando SHOW ENDPOINTS IN SERVICEpara obter o URL:

    SHOW ENDPOINTS IN SERVICE echo_service;
    
    Copy
  • Nome de sua conta Snowflake. Para obter mais informações, consulte Configuração comum: Verifique se você está pronto para continuar.

  • URL de sua conta Snowflake: É <acctname >.snowflakecomputing.com.

  • Nome de usuário na conta Snowflake. Este é o usuário escolhido em Configuração comum: Criação de objetos Snowflake. Você faz login no Snowflake como este usuário e testa o acesso programático.

  • Nome da função: você criou uma função (test_role) como parte da configuração comum. O usuário assume essa função para executar ações.

Configuração

Siga as etapas para se comunicar com o serviço Echo programaticamente. Como usar o código Python fornecido, você envia solicitações ao ponto de extremidade público que o serviço Echo expõe.

  1. Em um prompt de comando, crie um diretório e navegue até ele.

  2. Configure a autenticação do par de chaves para o usuário.

    1. Gerar um par de chaves:

      1. Gere uma chave privada. Para simplificar as etapas do exercício, você gera uma chave privada não criptografada. Você também pode usar uma chave privada criptografada, mas será necessário digitar a senha.

        openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
        
        Copy
      2. Gere uma chave pública (rsa_key.pub) referenciando a chave privada que você criou.

        openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
        
        Copy
    2. Verifique se você tem a chave privada e a chave pública geradas no diretório.

    3. Atribua a chave pública ao usuário que você está usando para testar o acesso programático. Isso permite que o usuário especifique a chave para autenticação.

      ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
      
      Copy
  3. Salve o código de amostra fornecido em arquivos Python.

    1. Salve o código a seguir em generateJWT.py.

      # To run this on the command line, enter:
      #   python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file>
      
      from cryptography.hazmat.primitives.serialization import load_pem_private_key
      from cryptography.hazmat.primitives.serialization import Encoding
      from cryptography.hazmat.primitives.serialization import PublicFormat
      from cryptography.hazmat.backends import default_backend
      from datetime import timedelta, timezone, datetime
      import argparse
      import base64
      from getpass import getpass
      import hashlib
      import logging
      import sys
      
      # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/).
      import jwt
      
      logger = logging.getLogger(__name__)
      
      try:
          from typing import Text
      except ImportError:
          logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True)
          from typing_extensions import Text
      
      ISSUER = "iss"
      EXPIRE_TIME = "exp"
      ISSUE_TIME = "iat"
      SUBJECT = "sub"
      
      # If you generated an encrypted private key, implement this method to return
      # the passphrase for decrypting your private key. As an example, this function
      # prompts the user for the passphrase.
      def get_private_key_passphrase():
          return getpass('Passphrase for private key: ')
      
      class JWTGenerator(object):
          """
          Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the
          generated token and only regenerates the token if a specified period of time has passed.
          """
          LIFETIME = timedelta(minutes=59)  # The tokens will have a 59-minute lifetime
          RENEWAL_DELTA = timedelta(minutes=54)  # Tokens will be renewed after 54 minutes
          ALGORITHM = "RS256"  # Tokens will be generated using RSA with SHA256
      
          def __init__(self, account: Text, user: Text, private_key_file_path: Text,
                      lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA):
              """
              __init__ creates an object that generates JWTs for the specified user, account identifier, and private key.
              :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator.
              :param user: The Snowflake username.
              :param private_key_file_path: Path to the private key file used for signing the JWTs.
              :param lifetime: The number of minutes (as a timedelta) during which the key will be valid.
              :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT.
              """
      
              logger.info(
                  """Creating JWTGenerator with arguments
                  account : %s, user : %s, lifetime : %s, renewal_delay : %s""",
                  account, user, lifetime, renewal_delay)
      
              # Construct the fully qualified name of the user in uppercase.
              self.account = self.prepare_account_name_for_jwt(account)
              self.user = user.upper()
              self.qualified_username = self.account + "." + self.user
      
              self.lifetime = lifetime
              self.renewal_delay = renewal_delay
              self.private_key_file_path = private_key_file_path
              self.renew_time = datetime.now(timezone.utc)
              self.token = None
      
              # Load the private key from the specified file.
              with open(self.private_key_file_path, 'rb') as pem_in:
                  pemlines = pem_in.read()
                  try:
                      # Try to access the private key without a passphrase.
                      self.private_key = load_pem_private_key(pemlines, None, default_backend())
                  except TypeError:
                      # If that fails, provide the passphrase returned from get_private_key_passphrase().
                      self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend())
      
          def prepare_account_name_for_jwt(self, raw_account: Text) -> Text:
              """
              Prepare the account identifier for use in the JWT.
              For the JWT, the account identifier must not include the subdomain or any region or cloud provider information.
              :param raw_account: The specified account identifier.
              :return: The account identifier in a form that can be used to generate the JWT.
              """
              account = raw_account
              if not '.global' in account:
                  # Handle the general case.
                  idx = account.find('.')
                  if idx > 0:
                      account = account[0:idx]
              else:
                  # Handle the replication case.
                  idx = account.find('-')
                  if idx > 0:
                      account = account[0:idx]
              # Use uppercase for the account identifier.
              return account.upper()
      
          def get_token(self) -> Text:
              """
              Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the
              specified renewal time has passed.
              :return: the new token
              """
              now = datetime.now(timezone.utc)  # Fetch the current time
      
              # If the token has expired or doesn't exist, regenerate the token.
              if self.token is None or self.renew_time <= now:
                  logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)",
                              now, self.renew_time)
                  # Calculate the next time we need to renew the token.
                  self.renew_time = now + self.renewal_delay
      
                  # Prepare the fields for the payload.
                  # Generate the public key fingerprint for the issuer in the payload.
                  public_key_fp = self.calculate_public_key_fingerprint(self.private_key)
      
                  # Create our payload
                  payload = {
                      # Set the issuer to the fully qualified username concatenated with the public key fingerprint.
                      ISSUER: self.qualified_username + '.' + public_key_fp,
      
                      # Set the subject to the fully qualified username.
                      SUBJECT: self.qualified_username,
      
                      # Set the issue time to now.
                      ISSUE_TIME: now,
      
                      # Set the expiration time, based on the lifetime specified for this object.
                      EXPIRE_TIME: now + self.lifetime
                  }
      
                  # Regenerate the actual token
                  token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM)
                  # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string.
                  # If the token is a byte string, convert it to a string.
                  if isinstance(token, bytes):
                    token = token.decode('utf-8')
                  self.token = token
                  logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM]))
      
              return self.token
      
          def calculate_public_key_fingerprint(self, private_key: Text) -> Text:
              """
              Given a private key in PEM format, return the public key fingerprint.
              :param private_key: private key string
              :return: public key fingerprint
              """
              # Get the raw bytes of public key.
              public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
      
              # Get the sha256 hash of the raw bytes.
              sha256hash = hashlib.sha256()
              sha256hash.update(public_key_raw)
      
              # Base64-encode the value and prepend the prefix 'SHA256:'.
              public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8')
              logger.info("Public key fingerprint is %s", public_key_fp)
      
              return public_key_fp
      
      def main():
          logging.basicConfig(stream=sys.stdout, level=logging.INFO)
          cli_parser = argparse.ArgumentParser()
          cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").')
          cli_parser.add_argument('--user', required=True, help='The user name.')
          cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.')
          cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.')
          cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.')
          args = cli_parser.parse_args()
      
          token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token()
          print('JWT:')
          print(token)
      
      if __name__ == "__main__":
          main()
      
      Copy
    2. Salve o código a seguir em access-via-keypair.py.

      from generateJWT import JWTGenerator
      from datetime import timedelta
      import argparse
      import logging
      import sys
      import requests
      logger = logging.getLogger(__name__)
      
      def main():
        args = _parse_args()
        token = _get_token(args)
        snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role,
                        snowflake_account_url=args.snowflake_account_url,
                        snowflake_account=args.account)
        spcs_url=f'https://{args.endpoint}{args.endpoint_path}'
        connect_to_spcs(snowflake_jwt, spcs_url)
      
      def _get_token(args):
        token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime),
                  timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
      
      def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account):
        scope_role = f'session:role:{role}' if role is not None else None
        scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
        data = {
          'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
          'scope': scope,
          'assertion': token,
        }
        logger.info(data)
        url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token'
        if snowflake_account_url:
          url =       f'{snowflake_account_url}/oauth/token'
        logger.info("oauth url: %s" %url)
        response = requests.post(url, data=data)
        logger.info("snowflake jwt : %s" % response.text)
        assert 200 == response.status_code, "unable to get snowflake token"
        return response.text
      
      def connect_to_spcs(token, url):
        # Create a request to the ingress endpoint with authz.
        headers = {'Authorization': f'Snowflake Token="{token}"'}
        response = requests.post(f'{url}', headers=headers)
        logger.info("return code %s" % response.status_code)
        logger.info(response.text)
      
      def _parse_args():
        logging.basicConfig(stream=sys.stdout, level=logging.INFO)
        cli_parser = argparse.ArgumentParser()
        cli_parser.add_argument('--account', required=True,
                    help='The account identifier (for example, "myorganization-myaccount" for '
                      '"myorganization-myaccount.snowflakecomputing.com").')
        cli_parser.add_argument('--user', required=True, help='The user name.')
        cli_parser.add_argument('--private_key_file_path', required=True,
                    help='Path to the private key file used for signing the JWT.')
        cli_parser.add_argument('--lifetime', type=int, default=59,
                    help='The number of minutes that the JWT should be valid for.')
        cli_parser.add_argument('--renewal_delay', type=int, default=54,
                    help='The number of minutes before the JWT generator should produce a new JWT.')
        cli_parser.add_argument('--role',
                    help='The role we want to use to create and maintain a session for. If a role is not provided, '
                      'use the default role.')
        cli_parser.add_argument('--endpoint', required=True,
                    help='The ingress endpoint of the service')
        cli_parser.add_argument('--endpoint-path', default='/',
                    help='The url path for the ingress endpoint of the service')
        cli_parser.add_argument('--snowflake_account_url', default=None,
                    help='The account url of the account for which we want to log in. Type of '
                      'https://myorganization-myaccount.snowflakecomputing.com')
        args = cli_parser.parse_args()
        return args
      
      if __name__ == "__main__":
        main()
      
      Copy

Como enviar de solicitações ao ponto de extremidade do servidor programaticamente

Execute o código Python access-via-keypair.py para fazer a chamada de entrada para o ponto de extremidade público do serviço Echo.

python3 access-via-keypair.py \
  --account <account-identifier> \
  --user <user-name> \
  --role TEST_ROLE \
  --private_key_file_path rsa_key.p8 \
  --endpoint <ingress-hostname> \
  --endpoint-path /ui
Copy

Para obter mais informações sobre account-identifier, consulte Identificadores de conta.

Como funciona a autenticação

O código primeiro converte o par de chaves fornecido em um token JWT. Em seguida, ele envia o token JWT para o Snowflake para obter um token OAuth. Por fim, o código usa o token OAuth para se conectar ao Snowflake e acessar o ponto de extremidade público. Especificamente, o código faz o seguinte:

  1. Chama a função _get_token(args) para gerar um token JWT a partir do par de chaves fornecido. A implementação da função é mostrada:

    def _get_token(args):
        token = JWTGenerator(args.account,
                            args.user,
                            args.private_key_file_path,
                            timedelta(minutes=args.lifetime),
                            timedelta(minutes=args.renewal_delay)).get_token()
        logger.info("Key Pair JWT: %s" % token)
        return token
    
    Copy

    JWTGenerator é uma classe auxiliar fornecida a você. Observe o seguinte sobre os parâmetros que você fornece ao criar este objeto:

    • Parâmetros args.account e args.user: Um token JWT tem vários campos (consulte o formato de token); iss é um dos campos. Este valor de campo inclui o nome da conta Snowflake e um nome de usuário. Portanto, você fornece esses valores como parâmetros.

    • Os dois parâmetros timedelta fornecem as seguintes informações:

      • lifetime especifica o número de minutos durante os quais a chave será válida (60 minutos).

      • renewal_delay especifica o número de minutos a partir de agora após o qual o gerador JWT deve renovar o JWT.

  2. Chama a função token_exchange() para se conectar ao Snowflake e trocar o token JWT por um token OAuth.

    scope_role = f'session:role:{role}' if role is not None else None
    scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint
    
    data = {
        'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
        'scope': scope,
        'assertion': token,
    }
    
    Copy

    O código anterior constrói uma configuração JSON do escopo para o token OAuth, o ponto de extremidade público que pode ser acessado usando a função especificada. Este código então faz uma solicitação POST ao Snowflake passando o JSON para trocar o token JWT por um token OAuth (consulte Troca de tokens) conforme mostrado:

    url = f'{snowflake_account_url}/oauth/token'
    response = requests.post(url, data=data)
    assert 200 == response.status_code, "unable to get Snowflake token"
    return response.text
    
    Copy
  3. O código então chama a função connect_to_spcs() para se conectar ao ponto de extremidade público do serviço Echo. Ele fornece o URL (https://<URL de entrada>/ui) do ponto de extremidade e o token OAuth para autenticação.

    headers = {'Authorization': f'Snowflake Token="{token}"'}
    response = requests.post(f'{url}', headers=headers)
    
    Copy

    O url é o spcs_url que você forneceu ao programa, e o token é o token OAuth.

    O serviço Echo neste exemplo atende uma página HTML (conforme explicado na seção anterior). Este código de amostra simplesmente imprime o HTML na resposta.

6: Limpeza

Se você não planeja continuar com o Tutorial 2 ou o Tutorial 3, remova os recursos faturáveis que você criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.

7: Análise do código de serviço

Esta seção cobre os seguintes tópicos:

Análise do código do tutorial 1

O arquivo zip baixado na etapa 1 inclui os seguintes arquivos:

  • Dockerfile

  • echo_service.py

  • templates/basic_ui.html

Você também usa a especificação de serviço ao criar o serviço. A seção a seguir explica como esses componentes de código funcionam juntos para criar o serviço.

Arquivo echo_service.py

Este arquivo Python contém o código que implementa um servidor HTTP mínimo que retorna (ecoa) o texto de entrada. O código executa principalmente duas tarefas: manipular solicitações de eco das funções de serviço do Snowflake e fornecer uma interface de usuário da web (UI) para enviar solicitações de eco.

from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys

SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')


def get_logger(logger_name):
  logger = logging.getLogger(logger_name)
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  handler.setFormatter(
    logging.Formatter(
      '%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
  logger.addHandler(handler)
  return logger


logger = get_logger('echo-service')

app = Flask(__name__)


@app.get("/healthcheck")
def readiness_probe():
  return "I'm ready!"


@app.post("/echo")
def echo():
  '''
  Main handler for input data sent by Snowflake.
  '''
  message = request.json
  logger.debug(f'Received request: {message}')

  if message is None or not message['data']:
    logger.info('Received empty message')
    return {}

  # input format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...],
  #     ...
  #   ]}
  input_rows = message['data']
  logger.info(f'Received {len(input_rows)} rows')

  # output format:
  #   {"data": [
  #     [row_index, column_1_value, column_2_value, ...}],
  #     ...
  #   ]}
  output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
  logger.info(f'Produced {len(output_rows)} rows')

  response = make_response({"data": output_rows})
  response.headers['Content-type'] = 'application/json'
  logger.debug(f'Sending response: {response.json}')
  return response


@app.route("/ui", methods=["GET", "POST"])
def ui():
  '''
  Main handler for providing a web UI.
  '''
  if request.method == "POST":
    # getting input in HTML form
    input_text = request.form.get("input")
    # display input and output
    return render_template("basic_ui.html",
      echo_input=input_text,
      echo_reponse=get_echo_response(input_text))
  return render_template("basic_ui.html")


def get_echo_response(input):
  return f'{CHARACTER_NAME} said {input}'

if __name__ == '__main__':
  app.run(host=SERVICE_HOST, port=SERVER_PORT)
Copy

No código:

  • A função echo permite que uma função de serviço Snowflake se comunique com o serviço. Esta função especifica a decoração @app.post() conforme mostrado:

    @app.post("/echo")
    def echo():
    
    Copy

    Quando o servidor de eco recebe sua solicitação HTTP POST com o caminho /echo, o servidor encaminha a solicitação para esta função. A função é executada e ecoa as cadeias de caracteres do corpo da solicitação na resposta.

    Para oferecer suporte à comunicação de uma função de serviço Snowflake, este servidor implementa as funções externas. Ou seja, a implementação do servidor segue um determinado formato de dados de entrada/saída para servir uma função SQL, e este é o mesmo formato de dados de entrada/saída usado pelas funções externas .

  • A seção de função ui do código exibe um formulário da web e lida com solicitações de eco enviadas do formulário da web. Esta função usa o decorador @app.route() para especificar que as solicitações de /ui serão tratadas por esta função:

    @app.route("/ui", methods=["GET", "POST"])
    def ui():
    
    Copy

    O serviço Echo expõe o ponto de extremidade echoendpoint publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui anexado em seu navegador, o navegador envia uma solicitação HTTP GET para esse caminho e o servidor encaminha a solicitação para essa função. A função é executada e retorna um formulário HTML simples para o usuário inserir uma cadeia de caracteres.

    Depois que o usuário insere uma cadeia de caracteres e envia o formulário, o navegador envia uma solicitação HTTP para esse caminho e o servidor encaminha a solicitação para essa mesma função. A função é executada e retorna uma resposta HTTP contendo a cadeia de caracteres original.

  • A função readiness_probe usa o decorador @app.get() para especificar que as solicitações de /healthcheck serão tratadas por esta função:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    Esta função permite que o Snowflake verifique a prontidão do serviço. Quando o contêiner é iniciado, o Snowflake deseja confirmar se o aplicativo está funcionando e se o serviço está pronto para atender às solicitações. O Snowflake envia uma solicitação HTTP GET com esse caminho (como uma análise de integridade, análise de prontidão) para garantir que apenas contêineres íntegros tenham tráfego. A função pode fazer o que você quiser.

  • A função get_logger ajuda a configurar a geração de registros.

Arquivo Docker

Este arquivo contém todos os comandos para criar uma imagem usando Docker.

ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Copy

O Dockerfile contém instruções para instalar a biblioteca Flask no contêiner Docker. O código em echo_service.py depende da biblioteca Flask para lidar com solicitações HTTP.

/template/basic_ui.html

O serviço Echo expõe o ponto de extremidade echoendpoint publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui anexado em seu navegador, o serviço Echo exibe este formulário. Você pode inserir uma cadeia de caracteres no formulário e enviá-lo, e o serviço retornará a cadeia de caracteres em uma resposta HTTP.

<!DOCTYPE html>
<html lang="en">
  <head>
    <title>Welcome to echo service!</title>
  </head>
  <body>
    <h1>Welcome to echo service!</h1>
    <form action="{{ url_for("ui") }}" method="post">
      <label for="input">Input:<label><br>
      <input type="text" id="input" name="input"><br>
    </form>
    <h2>Input:</h2>
    {{ echo_input }}
    <h2>Output:</h2>
    {{ echo_reponse }}
  </body>
</html>
Copy

Especificação de serviço

Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu serviço.

spec:
  containers:
  - name: echo
    image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
    env:
      SERVER_PORT: 8000
      CHARACTER_NAME: Bob
    readinessProbe:
      port: 8000
      path: /healthcheck
  endpoints:
  - name: echoendpoint
    port: 8000
    public: true
Copy

Na especificação do serviço:

  • O containers.image especifica a imagem do Snowflake para iniciar um contêiner.

  • O campo opcional endpoints especifica o ponto de extremidade que o serviço expõe.

    • O name especifica um nome amigável para a porta de rede TCP na qual o contêiner está escutando. Você usa esse nome de ponto de extremidade amigável para enviar solicitações para a porta correspondente. Observe que env.SERVER_PORT controla esse número de porta.

    • O ponto de extremidade também está configurado como public. Isto permite o tráfego para este ponto de extremidade a partir da web pública.

  • O campo opcional containers.env é adicionado para ilustrar como você pode substituir variáveis de ambiente que o Snowflake passa para todos os processos em seu contêiner. Por exemplo, o código de serviço (echo_service.py) lê as variáveis de ambiente com valores padrão conforme mostrado:

    CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
    SERVER_PORT = os.getenv('SERVER_PORT', 8080)
    
    Copy

    Funciona da seguinte maneira:

    • Quando o serviço Echo recebe uma solicitação HTTP POST com uma cadeia de caracteres (por exemplo, “Olá”) no corpo da solicitação, o serviço retorna “Eu disse Olá” por padrão. O código usa a variável de ambiente CHARACTER_NAME para determinar a palavra antes de «disse». Por padrão, CHARACTER_NAME é definido como “eu».

      Você pode substituir o valor padrão CHARACTER_NAME na especificação de serviço. Por exemplo, se você definir o valor como «Bob»; o serviço Echo retorna uma resposta «Bob disse Olá».

    • Da mesma forma, a especificação do serviço substitui a porta (SERVER_PORT) que o serviço escuta por 8000, substituindo a porta padrão 8080.

  • O campo readinessProbe identifica port e path que o Snowflake pode usar para enviar uma solicitação HTTP GET à análise de prontidão para verificar se o serviço está pronto para lidar com o tráfego.

    O código de serviço (echo_python.py) implementa a sonda de prontidão da seguinte forma:

    @app.get("/healthcheck")
    def readiness_probe():
    
    Copy

    Portanto, o arquivo de especificação inclui o campo container.readinessProbe adequadamente.

Para obter mais informações sobre especificações de serviço, consulte Referência de especificação de serviço.

Como entender a função de serviço

Uma função de serviço é um dos métodos de comunicação com seu serviço (consulte Como usar um serviço). Uma função de serviço é uma função definida pelo usuário (UDF) que você associa a um ponto de extremidade de serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço associado e recebe uma resposta.

Você cria a seguinte função de serviço executando o comando CREATE FUNCTION com os seguintes parâmetros:

CREATE FUNCTION my_echo_udf (InputText VARCHAR)
  RETURNS VARCHAR
  SERVICE=echo_service
  ENDPOINT=echoendpoint
  AS '/echo';
Copy

Observe o seguinte:

  • A função my_echo_udf usa uma cadeia de caracteres como entrada e retorna uma cadeia de caracteres.

  • A propriedade SERVICE identifica o serviço (echo_service) e a propriedade ENDPOINT identifica o nome do ponto de extremidade amigável (echoendpoint).

  • O AS “/echo” especifica o caminho para o serviço. Em echo_service.py, o decorador @app.post associa este caminho à função echo.

Esta função se conecta ao ENDPOINT específico do SERVICE especificado. Quando você invoca esta função, o Snowflake envia uma solicitação para o caminho /echo dentro do contêiner de serviço.

Criação e teste de uma imagem localmente

Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um serviço executado pelo Snowflake).

Para testar a imagem do Docker do Tutorial 1:

  1. Para criar uma imagem do Docker, no Docker CLI, execute o seguinte comando:

    docker build --rm -t my_service:local .
    
    Copy
  2. Para iniciar seu código, execute o seguinte comando:

    docker run --rm -p 8080:8080 my_service:local
    
    Copy
  3. Envie uma solicitação de eco ao serviço usando um dos seguintes métodos:

    • Usando o comando cURL:

      Em outra janela do terminal, usando cURL, envie a seguinte solicitação POST para a porta 8080:

      curl -X POST http://localhost:8080/echo \
        -H "Content-Type: application/json" \
        -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
      
      Copy

      Observe que o corpo da solicitação inclui duas cadeias de caracteres. Este comando cURL envia uma solicitação POST para a porta 8080 na qual o serviço está escutando. O 0 nos dados é o índice da cadeia de caracteres de entrada na lista. O serviço Echo ecoa as cadeias de caracteres de entrada na resposta, conforme mostrado:

      {"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
      
    • Usando um navegador da web:

      1. No seu navegador, no mesmo computador, abra http://localhost:8080/ui.

        Isso envia uma solicitação GET para a porta 8080, na qual o serviço está escutando. O serviço executa a função ui(), que renderiza um formulário HTML conforme mostrado:

        Formulário da web para comunicação com o serviço Echo.
      2. Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.

        Formulário da Web mostrando a resposta do serviço Echo.

Qual é o próximo passo?

Agora você pode testar o Tutorial 2 que executa um trabalho.