Tutoriel 1 : créer un service Snowpark Container Services¶
Introduction¶
Après avoir terminé l’installation commune, vous êtes prêt à créer un service. Dans ce tutoriel, vous créez un service (nommé echo_service
) qui renvoie simplement un texte que vous fournissez en entrée. Par exemple, si la chaîne d’entrée est « Hello World », le service renvoie « I said, Hello World ».
Ce tutoriel comporte deux parties :
Partie 1 : créez et testez un service. Vous téléchargez le code fourni pour ce tutoriel et suivez les instructions étape par étape :
Téléchargez le code de service pour ce tutoriel.
Créez une image Docker pour Snowpark Container Services, et chargez l’image dans un référentiel de votre compte.
Créez un service, en fournissant le fichier de spécification de service et le pool de calcul dans lequel le service sera exécuté.
Créez une fonction de service pour communiquer avec le service.
Utilisez le service. Vous envoyez des demandes echo au service et vérifiez la réponse.
Partie 2 : comprendre le service. Cette section donne une vue d’ensemble du code de service et met en évidence la façon dont les différentes composantes collaborent.
1 : Télécharger le code de service¶
Un code (une application Python) est fourni pour créer le service echo.
Téléchargez
SnowparkContainerServices-Tutorials.zip
.Décompressez le contenu, qui comprend un répertoire pour chaque tutoriel. Le répertoire
Tutorial-1
contient les fichiers suivants :Dockerfile
echo_service.py
templates/basic_ui.html
2 : Construire une image et la charger¶
Construisez une image pour la plateforme linux/amd64 prise en charge par Snowpark Container Services, puis chargez l’image dans le référentiel d’images de votre compte (voir Configuration commune).
Vous aurez besoin d’informations sur le référentiel (l’URL du référentiel et le nom d’hôte du registre) avant de pouvoir construire et charger l’image. Pour plus d’informations, voir Registre et référentiels.
Obtenir des informations sur le référentiel
Pour obtenir l’URL du référentiel, exécutez la commande SQL SHOW IMAGE REPOSITORIES.
SHOW IMAGE REPOSITORIES;
La colonne
repository_url
de la sortie fournit l’URL. En voici un exemple :<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
Le nom d’hôte dans l’URL du référentiel est le nom d’hôte du registre. En voici un exemple :
<orgname>-<acctname>.registry.snowflakecomputing.com
Construire l’image et la charger dans le référentiel
Ouvrez une fenêtre de terminal et accédez au répertoire contenant les fichiers que vous avez décompressés.
Pour créer une image Docker, exécutez la commande
docker build
suivante à l’aide de la CLI Docker. Notez que la commande spécifie le répertoire de travail actuel (.
) commePATH
pour les fichiers à utiliser pour la construction de l’image.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Pour
image_name
, utilisezmy_echo_service_image:latest
.
Exemple
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest .
Chargez l’image dans le référentiel de votre compte Snowflake. Pour que Docker puisse charger une image en votre nom dans votre référentiel, vous devez d’abord authentifier Docker avec le registre.
Pour authentifier Docker auprès du registre d’images Snowflake, exécutez la commande suivante.
docker login <registry_hostname> -u <username>
Pour
username
, indiquez votre nom d’utilisateur Snowflake. Docker vous demandera votre mot de passe.
Pour charger l’image, exécutez la commande suivante :
docker push <repository_url>/<image_name>
Exemple
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
3 : Créer un service¶
Dans cette section, vous créez un service et une fonction de service pour communiquer avec le service.
Pour créer un service, vous devez disposer des éléments suivants :
Un pool de calcul. Snowflake exécute votre service dans le pool de calcul spécifié. Vous avez créé un pool de calcul dans le cadre de la configuration commune.
Une spécification de service. Cette spécification fournit à Snowflake les informations nécessaires pour configurer et exécuter votre service. Pour plus d’informations, voir Snowpark Container Services : utilisation des tâches. Dans ce tutoriel, vous fournissez la spécification en ligne, dans la commande CREATE SERVICE. Vous pouvez également enregistrer la spécification dans un fichier de votre zone de préparation Snowflake et fournir des informations sur le fichier dans la commande CREATE SERVICE, comme le montre le tutoriel 2.
Une fonction de service est l’une des méthodes disponibles pour communiquer avec votre service. Une fonction de service est une fonction définie par l’utilisateur (UDF) que vous associez au point de terminaison de service. Lorsque la fonction de service est exécutée, elle envoie une requête au point de terminaison de service et reçoit une réponse.
Vérifiez que le pool de calcul est prêt et que vous êtes dans le bon contexte pour créer le service.
Auparavant, vous avez défini le contexte dans l’étape Configuration commune. Pour vous assurer que vous êtes dans le bon contexte pour les instructions SQL dans cette étape, exécutez ce qui suit :
USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse;
Pour s’assurer que le pool de calcul que vous avez créé dans la configuration commune est prêt, exécutez
DESCRIBE COMPUTE POOL
, et vérifiez questate
estACTIVE
ouIDLE
. Si lestate
estSTARTING
, vous devez attendre que lestate
devienneACTIVE
ouIDLE
.
DESCRIBE COMPUTE POOL tutorial_compute_pool;
Pour créer le service, exécutez la commande suivante en utilisant
test_role
:CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Note
Si un service portant ce nom existe déjà, utilisez la commande DROP SERVICE pour supprimer le service précédemment créé, puis créez ce service.
Exécutez les commandes SQL suivantes pour obtenir des informations détaillées sur le service que vous venez de créer. Pour plus d’informations, voir Snowpark Container Services : utilisation des tâches.
Pour dresser la liste des services de votre compte, exécutez la commande SHOW SERVICES :
SHOW SERVICES;
Pour connaître le statut de votre service, appelez la fonction système SYSTEM$GET_SERVICE_STATUS :
SELECT SYSTEM$GET_SERVICE_STATUS('echo_service');
Pour obtenir des informations sur votre service, exécutez la commande DESCRIBE SERVICE :
DESCRIBE SERVICE echo_service;
Pour créer une fonction de service, exécutez la commande suivante :
CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo';
Remarques :
La propriété SERVICE associe l’UDF au service
echo_service
.La propriété ENDPOINT associe l’UDF au point de terminaison
echoendpoint
au sein du service.AS “/echo” indique le chemin d’accès HTTP au serveur echo. Vous trouverez ce chemin dans le code de service (
echo_service.py
).
4 : Utiliser le service¶
Tout d’abord, pour définir le contexte des instructions SQL de cette section, exécutez ce qui suit :
USE ROLE test_role;
USE DATABASE tutorial_db;
USE SCHEMA data_schema;
USE WAREHOUSE tutorial_warehouse;
Vous pouvez maintenant communiquer avec le service echo.
Utilisation d’une fonction de service : vous pouvez appeler la fonction de service dans une requête. La fonction de service de l’exemple (
my_echo_udf
) peut prendre en entrée une seule chaîne ou une liste de chaînes.Exemple 1.1 : transmettre une seule chaîne
Pour appeler la fonction de service
my_echo_udf
, exécutez l’instruction SELECT suivante, en transmettant une chaîne d’entrée ('hello'
) :SELECT my_echo_udf('hello!');
Snowflake envoie une requête POST au point de terminaison de service (
echoendpoint
). À la réception de la requête, le service répercute la chaîne d’entrée dans la réponse.+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Exemple 1.2 : transmettre une liste de chaînes
Lorsque vous transmettez une liste de chaînes à la fonction de service, Snowflake regroupe ces chaînes d’entrée et envoie une série de requêtes POST au service. Une fois que le service a traité toutes les chaînes, Snowflake combine les résultats et les renvoie.
L’exemple suivant transmet une colonne de table à la fonction de service.
Créez une table avec plusieurs chaînes :
CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World')));
Pour vérifier que la table a été créée :
SELECT * FROM messages;
Pour appeler la fonction de service, exécutez l’instruction SELECT suivante, en transmettant les lignes de la table en entrée :
SELECT my_echo_udf(message_text) FROM messages;
Sortie :
+---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+
En utilisant un navigateur Web : le service expose le point de terminaison publiquement (voir la spécification en ligne fournie dans la commande CREATE SERVICE). Par conséquent, vous pouvez vous connecter à une UI Web que le service expose à Internet, puis envoyer des requêtes au service à partir d’un navigateur Web.
Trouvez l’URL du point de terminaison public auquel le service est exposé :
SHOW ENDPOINTS IN SERVICE echo_service;
La colonne
ingress_url
de la réponse fournit l’URL.Exemple
p6bye-myorg-myacct.snowflakecomputing.app
Ajoutez
/ui
à l’URL du point de terminaison et collez-la dans le navigateur Web. Le service exécute alors la fonctionui()
(voirecho_service.py
).Notez que la première fois que vous accédez à l’URL du point de terminaison, il vous sera demandé de vous connecter à Snowflake. Pour ce test, utilisez le même utilisateur que celui que vous avez utilisé pour créer le service afin de vous assurer qu’il dispose des privilèges nécessaires.
Saisissez la chaîne « Hello » dans la case Entrée et appuyez sur Retour.
Note
Vous pouvez accéder au point de terminaison public de manière programmatique. Pour un exemple de code, voir Accès aux points de terminaison publics depuis l’extérieur de Snowflake et authentification. Notez que vous devez ajouter
/ui
à l’URL du point de terminaison dans le code afin que Snowflake puisse acheminer la demande vers la fonctionui()
dans le code de service.
5 : (Facultatif) Accéder au point de terminaison public de manière programmatique¶
Dans la section précédente, vous avez testé le service Echo à l’aide d’un navigateur Web. Dans le navigateur, vous avez accédé au point de terminaison public (point de terminaison d’entrée) et envoyé des requêtes via l’UI Web que le service a exposée. Dans cette section, vous testez le même point de terminaison public par programmation.
L’exemple utilise l”authentification par paire de clés. À l’aide de la paire de clés que vous fournissez, l’exemple de code génère d’abord un jeton Web JSON (JWT) puis échange le jeton avec Snowflake contre un jeton OAuth. Le code utilise ensuite le jeton OAuth pour l’authentification lors de la communication avec le point de terminaison public du service Echo.
Conditions préalables¶
Assurez-vous d’avoir les informations suivantes :
URL d’entrée du point de terminaison public. Exécutez la commande SHOW ENDPOINTS IN SERVICE pour obtenir l’URL :
SHOW ENDPOINTS IN SERVICE echo_service;
Votre nom de compte Snowflake. Pour plus d’informations, consultez Configuration commune : vérifiez que vous êtes prêt à continuer.
Votre URL de compte Snowflake : c’est le
<nom de compte>.snowflakecomputing.com
.Nom d’utilisateur dans le compte Snowflake. Il s’agit de l’utilisateur que vous avez choisi dans Configuration courante : créer des objets Snowflake. Vous vous connectez à Snowflake en tant qu’utilisateur et testez l’accès programmatique.
Nom du rôle : vous avez créé un rôle (
test_role
) dans le cadre de la configuration commune. L’utilisateur assume ce rôle pour effectuer des actions.
Configuration¶
Suivez les étapes pour communiquer avec le service Echo par programmation. À l’aide du code Python fourni, vous envoyez des requêtes au point de terminaison public exposé par le service Echo.
À l’invite de commande, créez un répertoire et accédez-y.
Configurez l’authentification par paire de clés pour l’utilisateur.
Générez une paire de clés :
Générez une clé privée. Pour simplifier les étapes de l’exercice, vous générez une clé privée non chiffrée. Vous pouvez également utiliser une clé privée chiffrée, mais vous devrez saisir le mot de passe.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
Générez une clé publique (
rsa_key.pub
) en référençant la clé privée que vous avez créée.openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Vérifiez que vous disposez de la clé privée et de la clé publique générées dans le répertoire.
Attribuez la clé publique à l’utilisateur que vous utilisez pour tester l’accès programmatique. Cela permet à l’utilisateur de spécifier la clé pour l’authentification.
ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
Enregistrez l’exemple de code fourni dans des fichiers Python.
Enregistrez le code suivant dans
generateJWT.py
.# To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object): """ Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the generated token and only regenerates the token if a specified period of time has passed. """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA): """ __init__ creates an object that generates JWTs for the specified user, account identifier, and private key. :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator. :param user: The Snowflake username. :param private_key_file_path: Path to the private key file used for signing the JWTs. :param lifetime: The number of minutes (as a timedelta) during which the key will be valid. :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT. """ logger.info( """Creating JWTGenerator with arguments account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text: """ Prepare the account identifier for use in the JWT. For the JWT, the account identifier must not include the subdomain or any region or cloud provider information. :param raw_account: The specified account identifier. :return: The account identifier in a form that can be used to generate the JWT. """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text: """ Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the specified renewal time has passed. :return: the new token """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text: """ Given a private key in PEM format, return the public key fingerprint. :param private_key: private key string :return: public key fingerprint """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main()
Enregistrez le code suivant dans
access-via-keypair.py
.from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main()
Envoyer des requêtes au point de terminaison de service par programmation¶
Exécutez le code Python access-via-keypair.py
pour effectuer l’appel d’entrée vers le point de terminaison public du service Echo.
python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui
Pour plus d’informations sur account-identifier
, voir Identificateurs de compte.
Comment fonctionne l’authentification¶
Le code convertit d’abord la paire de clés fournie en un jeton JWT. Il envoie ensuite le jeton JWT à Snowflake pour obtenir un jeton OAuth. Enfin, le code utilise le jeton OAuth pour se connecter à Snowflake et accéder au point de terminaison public. Plus précisément, le code effectue les opérations suivantes :
Appelle la fonction
_get_token(args)
pour générer un jeton JWT de la paire de clés que vous fournissez. L’implémentation de la fonction est illustrée :def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token
JWTGenerator
est une classe d’aide qui vous est fournie. Notez les points suivants concernant les paramètres que vous fournissez lors de la création de cet objet :Paramètres
args.account
etargs.user
: un jeton JWT comporte plusieurs champs (voir format de jeton),iss
est l’un des domaines. Cette valeur de champ inclut le nom du compte Snowflake et un nom d’utilisateur. Par conséquent, vous fournissez ces valeurs comme paramètres.Les deux paramètres
timedelta
fournissent les informations suivantes :lifetime
précise le nombre de minutes pendant lesquelles la clé sera valide (60 minutes).renewal_delay
spécifie le nombre de minutes à partir de maintenant après lesquelles le générateur JWT doit renouveler le JWT.
Appelle la fonction
token_exchange()
pour se connecter à Snowflake et échanger le jeton JWT contre un jeton OAuth.scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, }
Le code précédent construit un JSON définissant la portée du jeton OAuth, le point de terminaison public accessible à l’aide du rôle spécifié. Ce code fait ensuite une demande POST à Snowflake de transmettre le JSON pour échanger le jeton JWT contre un jeton OAuth (voir Échange de jetons) comme indiqué :
url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text
Le code appelle ensuite la fonction
connect_to_spcs()
permettant de se connecter au point de terminaison public du service Echo. Il fournit l’URL (https://<ingress-URL>/ui
) du point de terminaison et du jeton OAuth pour l’authentification.headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers)
L”
url
est l”spcs_url
que vous avez fournie au programme et letoken
est le jeton OAuth.Le service Echo dans cet exemple sert une page HTML (comme expliqué dans la section précédente). Cet exemple de code imprime simplement le code HTML dans la réponse.
6 : nettoyage¶
Si vous ne prévoyez pas de poursuivre le tutoriel 2 ou le tutoriel 3, vous devez supprimer les ressources facturables que vous avez créées. Pour plus d’informations, voir l’étape 5 dans le tutoriel 3.
7 : vérification du code de service¶
Cette section couvre les sujets suivants :
Examen du code du tutoriel 1 : examinez les fichiers de code qui mettent en œuvre le service echo.
Comprendre la fonction de service : cette section explique comment la fonction de service de ce tutoriel est liée au service.
Construire et tester une image localement. Cette section explique comment vous pouvez tester localement l’image Docker avant de la charger vers un référentiel dans votre compte Snowflake.
Examen du code du tutoriel 1¶
Le fichier zip que vous avez chargé à l’étape 1 comprend les fichiers suivants :
Dockerfile
echo_service.py
templates/basic_ui.html
Vous utilisez également la spécification de service lors de la création du service. La section suivante explique comment ces composants de code fonctionnent ensemble pour créer le service.
Fichier echo_service.py¶
Ce fichier Python contient le code qui met en œuvre un serveur HTTP minimal qui renvoie (retour echo) le texte saisi. Le code effectue principalement deux tâches : traiter les requêtes echo des fonctions de service Snowflake et fournir une interface utilisateur Web (UI) pour soumettre des requêtes echo.
from flask import Flask
from flask import request
from flask import make_response
from flask import render_template
import logging
import os
import sys
SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0')
SERVER_PORT = os.getenv('SERVER_PORT', 8080)
CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I')
def get_logger(logger_name):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
'%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
logger.addHandler(handler)
return logger
logger = get_logger('echo-service')
app = Flask(__name__)
@app.get("/healthcheck")
def readiness_probe():
return "I'm ready!"
@app.post("/echo")
def echo():
'''
Main handler for input data sent by Snowflake.
'''
message = request.json
logger.debug(f'Received request: {message}')
if message is None or not message['data']:
logger.info('Received empty message')
return {}
# input format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...],
# ...
# ]}
input_rows = message['data']
logger.info(f'Received {len(input_rows)} rows')
# output format:
# {"data": [
# [row_index, column_1_value, column_2_value, ...}],
# ...
# ]}
output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows]
logger.info(f'Produced {len(output_rows)} rows')
response = make_response({"data": output_rows})
response.headers['Content-type'] = 'application/json'
logger.debug(f'Sending response: {response.json}')
return response
@app.route("/ui", methods=["GET", "POST"])
def ui():
'''
Main handler for providing a web UI.
'''
if request.method == "POST":
# getting input in HTML form
input_text = request.form.get("input")
# display input and output
return render_template("basic_ui.html",
echo_input=input_text,
echo_reponse=get_echo_response(input_text))
return render_template("basic_ui.html")
def get_echo_response(input):
return f'{CHARACTER_NAME} said {input}'
if __name__ == '__main__':
app.run(host=SERVICE_HOST, port=SERVER_PORT)
Dans le code :
La fonction
echo
permet à une fonction de service Snowflake de communiquer avec le service. Cette fonction spécifie la décoration@app.post()
comme indiqué :@app.post("/echo") def echo():
Lorsque le serveur echo reçoit votre requête HTTP POST avec le chemin
/echo
, il achemine la requête vers cette fonction. La fonction s’exécute et renvoie les chaînes du corps de la requête dans la réponse.Pour prendre en charge la communication à partir d’une fonction de service Snowflake, ce serveur met en œuvre les fonctions externes. En d’autres termes, la mise en œuvre du serveur suit un certain format de données d’entrée/sortie afin de servir une fonction SQL, et il s’agit du même format de données d’entrée/de sortie utilisé par les fonctions externes.
La section de fonction
ui
du code affiche un formulaire Web et traite les requêtes echo soumises à partir du formulaire Web. Cette fonction utilise le décorateur@app.route()
pour spécifier que les requêtes pour/ui
sont traitées par cette fonction :@app.route("/ui", methods=["GET", "POST"]) def ui():
Le service echo expose publiquement le point de terminaison
echoendpoint
(voir la spécification du service), ce qui permet de communiquer avec le service sur le Web. Lorsque vous chargez l’URL du point de terminaison public avec /ui ajouté dans votre navigateur, celui-ci envoie une requête HTTP GET pour ce chemin, et le serveur achemine la requête vers cette fonction. La fonction s’exécute et renvoie un formulaire HTML simple dans lequel l’utilisateur peut saisir une chaîne.Une fois que l’utilisateur a saisi une chaîne et soumis le formulaire, le navigateur envoie une requête HTTP post pour ce chemin, et le serveur achemine la requête vers cette même fonction. La fonction s’exécute et renvoie une réponse HTTP contenant la chaîne d’origine.
La fonction
readiness_probe
utilise le décorateur@app.get()
pour spécifier que les requêtes pour/healthcheck
sont traitées par cette fonction :@app.get("/healthcheck") def readiness_probe():
Cette fonction permet à Snowflake de vérifier l’état de préparation du service. Lorsque le conteneur démarre, Snowflake veut confirmer que l’application fonctionne et que le service est prêt à répondre aux requêtes. Snowflake envoie une requête HTTP GET avec ce chemin (en tant que sonde de santé, probe readiness) pour s’assurer que seuls les conteneurs sains servent le trafic. La fonction peut faire ce que vous voulez.
La fonction
get_logger
permet de mettre en place la journalisation.
Dockerfile¶
Ce fichier contient toutes les commandes pour construire une image en utilisant Docker.
ARG BASE_IMAGE=python:3.10-slim-buster
FROM $BASE_IMAGE
COPY echo_service.py ./
COPY templates/ ./templates/
RUN pip install --upgrade pip && \\
pip install flask
CMD ["python", "echo_service.py"]
Le Dockerfile contient des instructions pour installer la bibliothèque Flask dans le conteneur Docker. Le code de echo_service.py
s’appuie sur la bibliothèque Flask pour traiter les requêtes HTTP.
/template/basic_ui.html¶
Le service echo expose publiquement le point de terminaison echoendpoint
(voir la spécification du service), ce qui permet de communiquer avec le service sur le Web. Lorsque vous chargez l’URL du point de terminaison public avec /ui
en annexe dans votre navigateur, le service echo affiche ce formulaire. Vous pouvez saisir une chaîne dans le formulaire et soumettre le formulaire, et le service renvoie la chaîne dans une réponse HTTP.
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to echo service!</title>
</head>
<body>
<h1>Welcome to echo service!</h1>
<form action="{{ url_for("ui") }}" method="post">
<label for="input">Input:<label><br>
<input type="text" id="input" name="input"><br>
</form>
<h2>Input:</h2>
{{ echo_input }}
<h2>Output:</h2>
{{ echo_reponse }}
</body>
</html>
Spécification du service¶
Snowflake utilise les informations que vous fournissez dans cette spécification pour configurer et faire fonctionner votre service.
spec:
containers:
- name: echo
image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
env:
SERVER_PORT: 8000
CHARACTER_NAME: Bob
readinessProbe:
port: 8000
path: /healthcheck
endpoints:
- name: echoendpoint
port: 8000
public: true
Dans la spécification de service :
Le
containers.image
spécifie l’image pour que Snowflake démarre un conteneur.Le champ facultatif
endpoints
spécifie le point de terminaison auquel le service est exposé.Le
name
spécifie un nom convivial pour le port réseau TCP sur lequel le conteneur est à l’écoute. Vous utilisez ce nom convivial de point de terminaison pour envoyer des requêtes au port correspondant. Notez que leenv.SERVER_PORT
contrôle ce numéro de port.Le point de terminaison est également configuré comme
public
. Cela permet au trafic d’accéder à ce point de terminaison depuis le Web public.
Le champ facultatif
containers.env
est ajouté pour illustrer la façon dont vous pouvez remplacer les variables d’environnement que Snowflake transmet à tous les processus de votre conteneur. Par exemple, le code de service (echo_service.py
) lit les variables d’environnement avec les valeurs par défaut comme indiqué :CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080)
Il fonctionne de la manière suivante :
Lorsque le service echo reçoit une requête HTTP POST avec une chaîne (par exemple, « Hello ») dans le corps de la requête, le service renvoie « I said Hello » par défaut. Le code utilise la variable d’environnement
CHARACTER_NAME
pour déterminer le mot qui précède « said ». Par défaut,CHARACTER_NAME
est défini sur « I ».Vous pouvez remplacer la valeur par défaut CHARACTER_NAME dans la spécification de service. Par exemple, si vous définissez la valeur sur « Bob », le service echo renvoie une réponse « Bob said Hello ».
De la même manière, la spécification du service remplace le port par défaut (SERVER_PORT) sur lequel le service écoute, qui est 8080, par 8000.
Le champ
readinessProbe
identifie lesport
etpath
que Snowflake peut utiliser pour envoyer une requête HTTP GET à la probe readiness afin de vérifier que le service est prêt à gérer le trafic.Le code de service (
echo_python.py
) met en œuvre la probe readiness comme suit :@app.get("/healthcheck") def readiness_probe():
Le fichier de spécification inclut donc le champ
container.readinessProbe
en conséquence.
Pour plus d’informations sur les spécifications de service, voir Référence Spécification de service.
Comprendre la fonction de service¶
Une fonction de service est l’une des méthodes utilisées pour communiquer avec votre service (voir Utilisation d’un service). Une fonction de service est une fonction définie par l’utilisateur (UDF) que vous associez à un point de terminaison de service. Lorsque la fonction de service est exécutée, elle envoie une requête au point de terminaison de service associé et reçoit une réponse.
Vous créez la fonction de service suivante en exécutant la commande CREATE FUNCTION avec les paramètres suivants :
CREATE FUNCTION my_echo_udf (InputText VARCHAR)
RETURNS VARCHAR
SERVICE=echo_service
ENDPOINT=echoendpoint
AS '/echo';
Remarques :
La fonction
my_echo_udf
prend une chaîne en entrée et renvoie une chaîne.La propriété SERVICE identifie le service (
echo_service
) et la propriété ENDPOINT identifie le nom convivial du point de terminaison (echoendpoint
).Le AS “/echo” spécifie le chemin d’accès au service. Dans
echo_service.py
, le décorateur@app.post
associe ce chemin à la fonctionecho
.
Cette fonction se connecte à l’élément ENDPOINT spécifique de l’élément SERVICE spécifié. Lorsque vous appelez cette fonction, Snowflake envoie une requête au chemin /echo
dans le conteneur de service.
Construire et tester une image localement¶
Vous pouvez tester l’image Docker localement avant de la charger vers un référentiel dans votre compte Snowflake. Dans les tests locaux, votre conteneur fonctionne de manière autonome (il ne s’agit pas d’un service exécuté par Snowflake).
Pour tester l’image Docker du tutoriel 1 :
Pour créer une image Docker, dans la CLI Docker, exécutez la commande suivante :
docker build --rm -t my_service:local .
Pour lancer votre code, exécutez la commande suivante :
docker run --rm -p 8080:8080 my_service:local
Envoyez une requête echo au service en utilisant l’une des méthodes suivantes :
Utilisation de la commande cURL :
Dans une autre fenêtre de terminal, en utilisant cURL, envoyez la requête POST suivante au port 8080 :
curl -X POST http://localhost:8080/echo \ -H "Content-Type: application/json" \ -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}'
Notez que le corps de la requête comprend deux chaînes. Cette commande cURL envoie une requête POST au port 8080 sur lequel le service est à l’écoute. Le 0 dans les données est l’indice de la chaîne d’entrée dans la liste. Le service echo répercute les chaînes saisies en réponse, comme indiqué :
{"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]}
En utilisant un navigateur Web :
Dans votre navigateur, sur le même ordinateur, ouvrez
http://localhost:8080/ui
.Cette opération envoie une requête GET au port 8080, sur lequel le service est à l’écoute. Le service exécute la fonction
ui()
, qui rend un formulaire HTML comme indiqué :Saisissez la chaîne « Hello » dans la case Entrée et appuyez sur Retour.
Quelle est la prochaine étape ?¶
Vous pouvez maintenant tester le tutoriel 2 qui exécute une tâche.