Tutoriel 3 : Créer un service et une tâche en utilisant les APIs Snowflake Python¶
Introduction¶
Dans les tutoriels Tutoriel 1 et Tutoriel 2, vous utilisez l’interface SQL pour créer un service et un conteneur Snowpark Container Services. Dans ce tutoriel, vous utilisez le site APIs Snowflake Python pour créer le même service et le même job et ainsi explorer l’utilisation du site Snowflake Python APIs pour gérer les ressources de Snowpark Container Services.
Le tutoriel utilise un notebook Snowflake pour exécuter le code Python, mais le code est indépendant du notebook et vous pouvez l’exécuter dans d’autres environnements.
1 : Configuration initiale¶
Dans cette configuration initiale, vous créez un notebook Snowflake, importez les bibliothèques dont vous avez besoin et définissez les constantes qui seront utilisées par les cellules dans les étapes suivantes.
Créez un notebook Snowflake.
Créez un notebook. Pour obtenir des instructions, voir Créer un nouveau notebook. Notez que l”environnement Python que vous choisissez dans l’UI (Exécuter sur un entrepôt ou Exécuter sur un conteneur) n’a pas d’importance.
Dans le menu déroulant Paquets, choisissez le paquet « Snowflake » et installez la dernière version de la bibliothèque Snowflake Python APIs.
(Facultatif) Supprimez les cellules fournies par défaut dans le notebook. À mesure que vous suivez les étapes de ce tutoriel, vous ajoutez des cellules Python au notebook.
Créez et exécutez la cellule d’importation des bibliothèques Python utilisées par de nombreuses cellules de ce tutoriel.
from snowflake.snowpark.context import get_active_session from snowflake.core import Root from snowflake.core import CreateMode
Créez et exécutez la cellule pour définir les constantes que vous utiliserez dans les cellules suivantes. Les valeurs fournies ci-dessous correspondent aux tutoriels 1 et 2. Vous pouvez éventuellement modifier ces valeurs.
current_user = get_active_session().get_current_user() user_role_name = "test_role" compute_pool_name = "tutorial_compute_pool" warehouse_name = "tutorial_warehouse" database_name = "tutorial_db" schema_name = "data_schema" repo_name = "tutorial_repository" stage_name = "tutorial_stage" service_name = "echo_service" print("configured!")
2 : Créer des objets de type Snowflake¶
Avant de pouvoir créer un service, vous avez besoin d’objets Snowflake, tels qu’une base de données, un utilisateur, un rôle, un pool de calcul et un dépôt d’images. Certains de ces objets sont des objets dans le champ d’application du compte dont la création nécessite des privilèges d’administrateur. Les noms des objets créés sont définis à l’étape précédente.
2.1 : Créer des objets Snowflake dans le champ d’application de votre compte¶
Le code Python suivant crée ces objets :
Rôle (
test_role
). Vous accordez à ce rôle tous les privilèges nécessaires à la création et à l’utilisation du service. Dans le code, vous accordez ce rôle à l’utilisateur actuel pour lui permettre de créer et d’utiliser le service.Base de données (
tutorial_db
). Dans l’étape suivante, vous créez un schéma dans cette base de données.Pool de calcul (
tutorial_compute_pool
). Votre conteneur de service s’exécute dans ce pool de calcul.Entrepôt (
tutorial_warehouse
). Lorsque le service se connecte à Snowflake et exécute des requêtes, cet entrepôt est utilisé pour exécuter les requêtes.
Créez et exécutez la cellule permettant de créer ces objets dans le champ d’application du compte en utilisant le rôle ACCOUNTADMIN. Notez que le script ne crée des ressources que si elles n’existent pas. Les commentaires dans le code indiquent les instructions SQL équivalentes.
from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse
session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)
# CREATE ROLE test_role;
root.roles.create(
Role(name=user_role_name),
mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)
# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
securable=Securables.role(user_role_name),
grantee=Grantees.user(name=current_user),
))
# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
# MIN_NODES = 1 MAX_NODES = 1
# INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
mode=CreateMode.if_not_exists,
compute_pool=ComputePool(
name=compute_pool_name,
instance_family="CPU_X64_XS",
min_nodes=1,
max_nodes=2,
)
)
# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
securable=Securables.compute_pool(compute_pool_name),
grantee=Grantees.role(name=user_role_name)
))
print(f"Created compute pool:", compute_pool_name)
# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
Database(name=database_name),
mode=CreateMode.if_not_exists)
# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.all_privileges],
securable=Securables.database(database_name),
grantee=Grantees.role(name=user_role_name),
))
print("Created database:", database_name)
# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
mode=CreateMode.if_not_exists)
# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
privileges=[Privileges.usage],
grantee=Grantees.role(name=user_role_name),
securable=Securables.warehouse(warehouse_name)
))
print("Created warehouse:", warehouse_name)
# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
privileges=[Privileges.bind_service_endpoint],
securable=Securables.current_account,
grantee=Grantees.role(name=user_role_name)
))
print("Done: GRANT BIND SERVICE ENDPOINT")
Lorsque vous créez des ressources, le code accorde également les privilèges nécessaires au rôle (test_role
) afin que celui-ci puisse utiliser ces ressources. En outre, notez que le service echo que vous créez dans ce tutoriel n’expose qu’un seul point de terminaison public. Ce point de terminaison public permet aux autres utilisateurs de votre compte d’accéder au service à partir du Web public (ingress). Pour créer un service avec un point de terminaison public, le rôle (test_role
) doit avoir le privilège BIND SERVICE ENDPOINT
sur le compte.
2.2 Créer des objets dans le champ d’application du schéma¶
Le code Python de cette section utilise le rôle test_role
pour créer un schéma et des objets dans ce schéma. Vous n’avez pas besoin de privilèges administratifs pour créer ces ressources.
Schéma (
data_schema
). Vous créez un dépôt d’images, un service et un job dans ce schéma.Dépôt d’images (
tutorial_repository
). Vous stockez l’image de votre application dans ce dépôt.Zone de préparation (
tutorial_stage
). La zone de préparation est créée uniquement à titre d’illustration. Bien qu’elles ne soient pas présentées dans ce tutoriel, les zones de préparation peuvent être utilisées pour transmettre des données à vos services ou en collecter.
Notez que le script ne crée des ressources que si elles n’existent pas.
from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable
session = get_active_session()
session.use_role(user_role_name)
root = Root(session)
# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
Schema(name=schema_name),
mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)
# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
ImageRepository(name=repo_name),
mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)
repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")
#CREATE STAGE IF NOT EXISTS tutorial_stage
# DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
Stage(
name=stage_name,
directory_table=StageDirectoryTable(enable=True)),
mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Le code Python imprime également des informations utiles sur le dépôt (l’URL du dépôt) que vous utilisez lorsque vous poussez vos images vers le dépôt.
3 : Construire une image et l’importer¶
Vous téléchargez localement le code comme décrit dans le Tutoriel 1, vous utilisez les commandes Docker pour construire l’image, et vous l’importez dans le dépôt d’images de votre compte.
Créez et exécutez la cellule pour obtenir le nom d’hôte de votre registre d’images et l’URL de votre dépôt d’images.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Le code Python récupère l”objet ressource du dépôt d’images (
repo
), accède à l’objet modèle et en extrait l’URL du dépôt.Suivez les étapes 1 et 2 du Tutoriel 1 pour importer le code de service, créer une image et la importer dans le dépôt.
Créez et exécutez la cellule pour vérifier que l’image se trouve dans le dépôt.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
Le code énumère les images à partir de la ressource du dépôt d’images (
repo
) et imprime l’adresseimage_path
pour chaque image.
4 : Créer un service¶
Créez un service et une fonction de service pour communiquer avec le service.
Vérifiez que le pool de calcul est prêt. Après avoir créé un pool de calcul, il faut un certain temps à Snowflake pour provisionner tous les nœuds. Assurez-vous que le pool de calcul est prêt avant de créer un service, car les conteneurs de service s’exécutent dans le pool de calcul spécifié.
Créez et exécutez la cellule pour obtenir le statut du pool de calcul :
import time session = get_active_session() session.use_role(user_role_name) root = Root(session) cp = root.compute_pools[compute_pool_name] cpm = cp.fetch() print(cpm.state, cpm.status_message) if cpm.state == 'SUSPENDED': cp.resume() while cpm.state in ['STARTING', 'SUSPENDED']: time.sleep(5) cpm = cp.fetch() print(cpm.state, cpm.status_message)
Le code récupère le modèle de pool de calcul (
cpm
) à partir de la ressource de pool de calcul (cp
) pour récupérer l’état actuel du pool de calcul. Si le pool de calcul est suspendu, le code reprend le pool de calcul. Le code tourne en boucle, en marquant une pause de cinq secondes à chaque fois, jusqu’à ce que le pool de calcul ne soit plus dans l’état STARTING ou SUSPENDED.La dernière ligne de sortie devrait être « IDLE » ou « ACTIVE », ce qui indique que le pool de calcul est prêt à exécuter votre service. Pour plus d’informations, voir Cycle de vie du pool de calcul. Si le pool de calcul n’est pas prêt, vos services ne peuvent pas démarrer.
Créez et exécutez la cellule pour créer le service echo.
from snowflake.core.service import Service, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url specification = f""" spec: containers: - name: echo image: {repo_url}/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true """ echo_service = schema.services.create(Service( name=service_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification), min_instances=1, max_instances=1), mode=CreateMode.if_not_exists) print("created service:", echo_service.name)
Le code récupère l’URL du dépôt, comme cela a été fait à l’étape précédente. Le code crée ensuite le site
echo_service
à l’aide d’une spécification en ligne et de l’image provenant du dépôt d’images spécifié.Comme vous le voyez dans le code Python, il est facile de paramétrer les noms des ressources. Voici la commande SQL équivalente qui crée un service mais n’utilise pas de paramètres.
CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec: containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env: SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe: port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true $$ MIN_INSTANCES=1 MAX_INSTANCES=1;
Exécutez la cellule pour créer une fonction de service (
my_echo_function
). Une fonction de service est l’une des façons d’utiliser le service.from snowflake.core.function import ServiceFunction, FunctionArgument session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # CREATE FUNCTION my_echo_udf (inputtext VARCHAR) # RETURNS VARCHAR # SERVICE=echo_service # ENDPOINT=echoendpoint # AS '/echo'; svcfn = schema.functions.create(mode=CreateMode.or_replace, function=ServiceFunction( name="my_echo_function", arguments=[FunctionArgument(name="inputtext", datatype="TEXT")], returns="TEXT", service=service_name, endpoint="echoendpoint", path="/echo")) print("created service function:", svcfn.name_with_args)
Le code appelle la méthode
create
sur la collectionfunctions
duschema
pour créer la fonction de service (my_echo_function
).
5 : Utiliser le service¶
Dans cette section, vous utilisez le service comme suit :
Invoquez la fonction de service.
Utilisez un navigateur pour interagir avec le point de terminaison public du service.
Invoquez la fonction de service.
svcfn = schema.functions["my_echo_function(TEXT)"] print(svcfn.execute(["hello"]))
Snowflake envoie une requête POST au point de terminaison de service (
echoendpoint
). À la réception de la requête, le service renvoie la chaîne d’entrée dans la réponse.Sortie :
+--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+
Accédez, à partir d’un navigateur, au point de terminaison public que le service expose.
Obtenez l’URL du point de terminaison public.
# helper to check if service is ready and return endpoint url def get_ingress_for_endpoint(svc, endpoint): for _ in range(10): # only try 10 times # Find the target endpoint. target_endpoint = None for ep in svc.get_endpoints(): if ep.is_public and ep.name == endpoint: target_endpoint = ep break; else: print(f"Endpoint {endpoint} not found") return None # Return endpoint URL or wait for it to be provisioned. if target_endpoint.ingress_url.startswith("Endpoints provisioning "): print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.") time.sleep(10) else: return target_endpoint.ingress_url print("Timed out waiting for endpoint to become available") endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint") print(f"https://{endpoint_url}/ui")
Collez l’URL imprimée dans une fenêtre du navigateur. Le service exécute alors la fonction
ui()
(voirecho_service.py
).Notez que la première fois que vous accédez à l’URL du point de terminaison, il vous sera demandé de vous connecter à Snowflake. Pour ce test, utilisez le même utilisateur que celui que vous avez utilisé pour créer le service afin de vous assurer qu’il dispose des privilèges nécessaires.
Saisissez la chaîne « Hello » dans la case Entrée et appuyez sur Retour.
6 : Créer un job¶
Dans le Tutoriel 2, vous utilisez l’interface SQL pour créer un service Snowpark Container Services. Dans cette section, vous créez le même job à l’aide de l’application Snowflake Python APIs.
Créez et exécutez la cellule pour obtenir le nom d’hôte de votre registre d’images et l’URL de votre dépôt d’images.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url print("image registry hostname:", repo_url.split("/")[0]) print("image repository url:", repo_url + "/")
Le code Python récupère l’objet ressource du dépôt d’images (
repo
), accède à l’objet modèle et en extrait l’URL du dépôt.Suivez les étapes 1 et 2 du Tutoriel 2 pour importer le code de service, créer une image et la importer dans le dépôt.
Créez et exécutez la cellule pour vérifier que l’image se trouve dans le dépôt.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] for image in repo.list_images_in_repository(): print(image.image_path)
Le code énumère les images à partir de la ressource du dépôt d’images (
repo
) et imprime l’adresseimage_path
pour chaque image.Créez et exécutez la cellule pour créer le job.
from snowflake.core.service import JobService, ServiceSpec session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] repo = schema.image_repositories[repo_name] repo_url = repo.fetch().repository_url job_name = "test_job" # cleanup previous job if present. schema.services[job_name].drop()(if_exists=True) specification = f""" spec: containers: - name: main image: {repo_url}/my_job_image:latest env: SNOWFLAKE_WAREHOUSE: {warehouse_name} args: - "--query=select current_time() as time,'hello'" - "--result_table=results" """ job = schema.services.execute_job(JobService( name=job_name, compute_pool=compute_pool_name, spec=ServiceSpec(specification))) print("executed job:", job.name, "status:", job.fetch().status) print("job logs:") print(job.get_service_logs(0, "main"))
Le job exécute la requête donnée et stocke les résultats dans une table.
Exécutez la cellule suivante pour examiner le résultat inscrit dans la table. Ce code utilise Python Snowpark pour effectuer des requêtes dans cette table.
session = get_active_session() session.use_role(user_role_name) # show that above job wrote to results table session.sql(f"select * from {database_name}.{schema_name}.results").collect()
7 : Nettoyer¶
Arrêtez le service et abandonnez-le. Après avoir abandonné le service, Snowflake suspend par défaut automatiquement le pool de calcul (en supposant qu’il n’y a pas d’autres services et services de job en cours d’exécution). Pour plus d’informations, voir cycle de vie du pool de calcul.
session = get_active_session() session.use_role(user_role_name) root = Root(session) schema = root.databases[database_name].schemas[schema_name] # now let's clean up schema.functions["my_echo_function(TEXT)"].drop() schema.services[service_name].drop()
Abandonnez le dépôt d’images pour éviter de payer le stockage. Notez que si vous avez d’autres images stockées dans le dépôt, elles seront supprimées.
schema.image_repositories[repo_name].drop()
Détruire le schéma. L’abandon d’un schéma entraîne également l’abandon de tous les objets de ce schéma. Pour ce tutoriel, il s’agit du service, de la fonction, du dépôt d’images et de la zone de préparation que vous avez créée.
root.databases[database_name].schemas[schema_name].drop()
Au lieu d’attendre que Snowflake suspende votre pool de calcul, vous pouvez également suspendre explicitement le pool de calcul. Dans ce casse, Snowflake suspend tous les services en cours d’exécution et attend que tous les jobs en cours se terminent, puis suspend le pool de calcul.
root.compute_pool[compute_pool_name].suspend()
Quelle est la prochaine étape ?¶
Ce tutoriel présente l’utilisation de Snowflake Python APIs pour créer et gérer les services et les jobs Snowpark Container Services. Pour plus d’informations sur Snowflake Python APIs, voir Snowflake Python APIs : gestion des objets Snowflake avec Python.