Tutoriel 3 : Créer un service et une tâche en utilisant les APIs Snowflake Python

Introduction

Dans les tutoriels Tutoriel 1 et Tutoriel 2, vous utilisez l’interface SQL pour créer un service et un conteneur Snowpark Container Services. Dans ce tutoriel, vous utilisez le site APIs Snowflake Python pour créer le même service et le même job et ainsi explorer l’utilisation du site Snowflake Python APIs pour gérer les ressources de Snowpark Container Services.

Le tutoriel utilise un notebook Snowflake pour exécuter le code Python, mais le code est indépendant du notebook et vous pouvez l’exécuter dans d’autres environnements.

1 : Configuration initiale

Dans cette configuration initiale, vous créez un notebook Snowflake, importez les bibliothèques dont vous avez besoin et définissez les constantes qui seront utilisées par les cellules dans les étapes suivantes.

  1. Créez un notebook Snowflake.

    1. Créez un notebook. Pour obtenir des instructions, voir Créer un nouveau notebook. Notez que l”environnement Python que vous choisissez dans l’UI (Exécuter sur un entrepôt ou Exécuter sur un conteneur) n’a pas d’importance.

    2. Dans le menu déroulant Paquets, choisissez le paquet « Snowflake » et installez la dernière version de la bibliothèque Snowflake Python APIs.

    3. (Facultatif) Supprimez les cellules fournies par défaut dans le notebook. À mesure que vous suivez les étapes de ce tutoriel, vous ajoutez des cellules Python au notebook.

  2. Créez et exécutez la cellule d’importation des bibliothèques Python utilisées par de nombreuses cellules de ce tutoriel.

    from snowflake.snowpark.context import get_active_session
    from snowflake.core import Root
    from snowflake.core import CreateMode
    
    Copy
  3. Créez et exécutez la cellule pour définir les constantes que vous utiliserez dans les cellules suivantes. Les valeurs fournies ci-dessous correspondent aux tutoriels 1 et 2. Vous pouvez éventuellement modifier ces valeurs.

    current_user = get_active_session().get_current_user()
    user_role_name = "test_role"
    compute_pool_name = "tutorial_compute_pool"
    warehouse_name = "tutorial_warehouse"
    database_name = "tutorial_db"
    schema_name = "data_schema"
    repo_name = "tutorial_repository"
    stage_name = "tutorial_stage"
    service_name = "echo_service"
    print("configured!")
    
    Copy

2 : Créer des objets de type Snowflake

Avant de pouvoir créer un service, vous avez besoin d’objets Snowflake, tels qu’une base de données, un utilisateur, un rôle, un pool de calcul et un dépôt d’images. Certains de ces objets sont des objets dans le champ d’application du compte dont la création nécessite des privilèges d’administrateur. Les noms des objets créés sont définis à l’étape précédente.

2.1 : Créer des objets Snowflake dans le champ d’application de votre compte

Le code Python suivant crée ces objets :

  • Rôle (test_role). Vous accordez à ce rôle tous les privilèges nécessaires à la création et à l’utilisation du service. Dans le code, vous accordez ce rôle à l’utilisateur actuel pour lui permettre de créer et d’utiliser le service.

  • Base de données (tutorial_db). Dans l’étape suivante, vous créez un schéma dans cette base de données.

  • Pool de calcul (tutorial_compute_pool). Votre conteneur de service s’exécute dans ce pool de calcul.

  • Entrepôt (tutorial_warehouse). Lorsque le service se connecte à Snowflake et exécute des requêtes, cet entrepôt est utilisé pour exécuter les requêtes.

Créez et exécutez la cellule permettant de créer ces objets dans le champ d’application du compte en utilisant le rôle ACCOUNTADMIN. Notez que le script ne crée des ressources que si elles n’existent pas. Les commentaires dans le code indiquent les instructions SQL équivalentes.

from snowflake.core.compute_pool import ComputePool
from snowflake.core.database import Database
from snowflake.core.grant import Grant, Grantees, Privileges, Securable, Securables
from snowflake.core.role import Role
from snowflake.core.warehouse import Warehouse

session = get_active_session()
session.use_role("ACCOUNTADMIN")
root = Root(session)

# CREATE ROLE test_role;
root.roles.create(
    Role(name=user_role_name),
    mode=CreateMode.if_not_exists)
print(f"Created role:", user_role_name)

# GRANT ROLE test_role TO USER <user_name>
root.grants.grant(Grant(
    securable=Securables.role(user_role_name),
    grantee=Grantees.user(name=current_user),
    ))

# CREATE COMPUTE POOL IF NOT EXISTS tutorial_compute_pool
#   MIN_NODES = 1 MAX_NODES = 1
#   INSTANCE_FAMILY = CPU_X64_XS
root.compute_pools.create(
    mode=CreateMode.if_not_exists,
    compute_pool=ComputePool(
        name=compute_pool_name,
        instance_family="CPU_X64_XS",
        min_nodes=1,
        max_nodes=2,
    )
)

# GRANT USAGE, OPERATE, MONITOR ON COMPUTE POOL tutorial_compute_pool TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.usage, Privileges.operate, Privileges.monitor],
    securable=Securables.compute_pool(compute_pool_name),
    grantee=Grantees.role(name=user_role_name)
    ))

print(f"Created compute pool:", compute_pool_name)

# CREATE DATABASE IF NOT EXISTS tutorial_db;
root.databases.create(
    Database(name=database_name),
    mode=CreateMode.if_not_exists)

# GRANT ALL ON DATABASE tutorial_db TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.all_privileges],
    securable=Securables.database(database_name),
    grantee=Grantees.role(name=user_role_name),
    ))

print("Created database:", database_name)

# CREATE OR REPLACE WAREHOUSE tutorial_warehouse WITH WAREHOUSE_SIZE='X-SMALL';
root.warehouses.create(
    Warehouse(name=warehouse_name, warehouse_size="X-SMALL"),
    mode=CreateMode.if_not_exists)

# GRANT USAGE ON WAREHOUSE tutorial_warehouse TO ROLE test_role;
root.grants.grant(Grant(
    privileges=[Privileges.usage],
    grantee=Grantees.role(name=user_role_name),
    securable=Securables.warehouse(warehouse_name)
    ))

print("Created warehouse:", warehouse_name)

# GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE test_role
root.grants.grant(Grant(
    privileges=[Privileges.bind_service_endpoint],
    securable=Securables.current_account,
    grantee=Grantees.role(name=user_role_name)
    ))

print("Done: GRANT BIND SERVICE ENDPOINT")
Copy

Lorsque vous créez des ressources, le code accorde également les privilèges nécessaires au rôle (test_role) afin que celui-ci puisse utiliser ces ressources. En outre, notez que le service echo que vous créez dans ce tutoriel n’expose qu’un seul point de terminaison public. Ce point de terminaison public permet aux autres utilisateurs de votre compte d’accéder au service à partir du Web public (ingress). Pour créer un service avec un point de terminaison public, le rôle (test_role) doit avoir le privilège BIND SERVICE ENDPOINT sur le compte.

2.2 Créer des objets dans le champ d’application du schéma

Le code Python de cette section utilise le rôle test_role pour créer un schéma et des objets dans ce schéma. Vous n’avez pas besoin de privilèges administratifs pour créer ces ressources.

  • Schéma (data_schema). Vous créez un dépôt d’images, un service et un job dans ce schéma.

  • Dépôt d’images (tutorial_repository). Vous stockez l’image de votre application dans ce dépôt.

  • Zone de préparation (tutorial_stage). La zone de préparation est créée uniquement à titre d’illustration. Bien qu’elles ne soient pas présentées dans ce tutoriel, les zones de préparation peuvent être utilisées pour transmettre des données à vos services ou en collecter.

Notez que le script ne crée des ressources que si elles n’existent pas.

from snowflake.core.image_repository import ImageRepository
from snowflake.core.schema import Schema
from snowflake.core.stage import Stage, StageDirectoryTable

session = get_active_session()
session.use_role(user_role_name)
root = Root(session)

# CREATE SCHEMA IF NOT EXISTS {schema_name}
schema = root.databases[database_name].schemas.create(
    Schema(name=schema_name),
    mode=CreateMode.if_not_exists)
print("Created schema:", schema.name)

# CREATE IMAGE REPOSITORY IF NOT EXISTS {repo}
repo = schema.image_repositories.create(
    ImageRepository(name=repo_name),
    mode=CreateMode.if_not_exists)
print("Create image repository:", repo.fully_qualified_name)

repo_url = repo.fetch().repository_url
print("image registry hostname:", repo_url.split("/")[0])
print("image repository url:", repo_url + "/")


#CREATE STAGE IF NOT EXISTS tutorial_stage
#  DIRECTORY = ( ENABLE = true );
stage = schema.stages.create(
    Stage(
        name=stage_name,
        directory_table=StageDirectoryTable(enable=True)),
    mode=CreateMode.if_not_exists)
print("Created stage:", stage.fully_qualified_name)
Copy

Le code Python imprime également des informations utiles sur le dépôt (l’URL du dépôt) que vous utilisez lorsque vous poussez vos images vers le dépôt.

3 : Construire une image et l’importer

Vous téléchargez localement le code comme décrit dans le Tutoriel 1, vous utilisez les commandes Docker pour construire l’image, et vous l’importez dans le dépôt d’images de votre compte.

  1. Créez et exécutez la cellule pour obtenir le nom d’hôte de votre registre d’images et l’URL de votre dépôt d’images.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Le code Python récupère l”objet ressource du dépôt d’images (repo), accède à l’objet modèle et en extrait l’URL du dépôt.

  2. Suivez les étapes 1 et 2 du Tutoriel 1 pour importer le code de service, créer une image et la importer dans le dépôt.

  3. Créez et exécutez la cellule pour vérifier que l’image se trouve dans le dépôt.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    Le code énumère les images à partir de la ressource du dépôt d’images (repo) et imprime l’adresse image_path pour chaque image.

4 : Créer un service

Créez un service et une fonction de service pour communiquer avec le service.

  1. Vérifiez que le pool de calcul est prêt. Après avoir créé un pool de calcul, il faut un certain temps à Snowflake pour provisionner tous les nœuds. Assurez-vous que le pool de calcul est prêt avant de créer un service, car les conteneurs de service s’exécutent dans le pool de calcul spécifié.

    Créez et exécutez la cellule pour obtenir le statut du pool de calcul :

    import time
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    cp = root.compute_pools[compute_pool_name]
    
    cpm = cp.fetch()
    print(cpm.state, cpm.status_message)
    if cpm.state == 'SUSPENDED':
        cp.resume()
    while cpm.state in ['STARTING', 'SUSPENDED']:
        time.sleep(5)
        cpm = cp.fetch()
        print(cpm.state, cpm.status_message)
    
    Copy

    Le code récupère le modèle de pool de calcul (cpm) à partir de la ressource de pool de calcul (cp) pour récupérer l’état actuel du pool de calcul. Si le pool de calcul est suspendu, le code reprend le pool de calcul. Le code tourne en boucle, en marquant une pause de cinq secondes à chaque fois, jusqu’à ce que le pool de calcul ne soit plus dans l’état STARTING ou SUSPENDED.

    La dernière ligne de sortie devrait être « IDLE » ou « ACTIVE », ce qui indique que le pool de calcul est prêt à exécuter votre service. Pour plus d’informations, voir Cycle de vie du pool de calcul. Si le pool de calcul n’est pas prêt, vos services ne peuvent pas démarrer.

  2. Créez et exécutez la cellule pour créer le service echo.

    from snowflake.core.service import Service, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    specification = f"""
        spec:
          containers:
          - name: echo
            image: {repo_url}/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
    
        """
    echo_service = schema.services.create(Service(
        name=service_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification),
        min_instances=1,
        max_instances=1),
        mode=CreateMode.if_not_exists)
    print("created service:", echo_service.name)
    
    Copy

    Le code récupère l’URL du dépôt, comme cela a été fait à l’étape précédente. Le code crée ensuite le site echo_service à l’aide d’une spécification en ligne et de l’image provenant du dépôt d’images spécifié.

    Comme vous le voyez dans le code Python, il est facile de paramétrer les noms des ressources. Voici la commande SQL équivalente qui crée un service mais n’utilise pas de paramètres.

    CREATE SERVICE echo_service
      IN COMPUTE POOL tutorial_compute_pool
      FROM SPECIFICATION $$
        spec:
          containers:
          - name: echo
            image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest
            env:
              SERVER_PORT: 8000
              CHARACTER_NAME: Bob
            readinessProbe:
              port: 8000
              path: /healthcheck
          endpoints:
          - name: echoendpoint
            port: 8000
            public: true
      $$
      MIN_INSTANCES=1
      MAX_INSTANCES=1;
    
    Copy
  3. Exécutez la cellule pour créer une fonction de service (my_echo_function). Une fonction de service est l’une des façons d’utiliser le service.

    from snowflake.core.function import ServiceFunction, FunctionArgument
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # CREATE FUNCTION my_echo_udf (inputtext VARCHAR)
    #  RETURNS VARCHAR
    #  SERVICE=echo_service
    #  ENDPOINT=echoendpoint
    #  AS '/echo';
    svcfn = schema.functions.create(mode=CreateMode.or_replace,
        function=ServiceFunction(
            name="my_echo_function",
            arguments=[FunctionArgument(name="inputtext", datatype="TEXT")],
            returns="TEXT",
            service=service_name,
            endpoint="echoendpoint",
            path="/echo"))
    print("created service function:", svcfn.name_with_args)
    
    Copy

    Le code appelle la méthode create sur la collection functions du schema pour créer la fonction de service (my_echo_function).

5 : Utiliser le service

Dans cette section, vous utilisez le service comme suit :

  • Invoquez la fonction de service.

  • Utilisez un navigateur pour interagir avec le point de terminaison public du service.

  1. Invoquez la fonction de service.

    svcfn = schema.functions["my_echo_function(TEXT)"]
    print(svcfn.execute(["hello"]))
    
    Copy

    Snowflake envoie une requête POST au point de terminaison de service (echoendpoint). À la réception de la requête, le service renvoie la chaîne d’entrée dans la réponse.

    Sortie :

    +--------------------------+
    | **MY_ECHO_UDF('HELLO!')**|
    |------------------------- |
    | Bob said hello!          |
    +--------------------------+
    
  2. Accédez, à partir d’un navigateur, au point de terminaison public que le service expose.

    1. Obtenez l’URL du point de terminaison public.

      # helper to check if service is ready and return endpoint url
      def get_ingress_for_endpoint(svc, endpoint):
          for _ in range(10): # only try 10 times
              # Find the target endpoint.
              target_endpoint = None
              for ep in svc.get_endpoints():
                  if ep.is_public and ep.name == endpoint:
                      target_endpoint = ep
                      break;
              else:
                  print(f"Endpoint {endpoint} not found")
                  return None
      
              # Return endpoint URL or wait for it to be provisioned.
              if target_endpoint.ingress_url.startswith("Endpoints provisioning "):
                  print(f"{target_endpoint.ingress_url} is still in provisioning. Wait for 10 seconds.")
                  time.sleep(10)
              else:
                  return target_endpoint.ingress_url
          print("Timed out waiting for endpoint to become available")
      
      endpoint_url = get_ingress_for_endpoint(echo_service, "echoendpoint")
      print(f"https://{endpoint_url}/ui")
      
      Copy
    2. Collez l’URL imprimée dans une fenêtre du navigateur. Le service exécute alors la fonction ui() (voir echo_service.py).

      Notez que la première fois que vous accédez à l’URL du point de terminaison, il vous sera demandé de vous connecter à Snowflake. Pour ce test, utilisez le même utilisateur que celui que vous avez utilisé pour créer le service afin de vous assurer qu’il dispose des privilèges nécessaires.

      Formulaire Web pour communiquer avec le service echo.
    3. Saisissez la chaîne « Hello » dans la case Entrée et appuyez sur Retour.

      Formulaire Web affichant la réponse du service echo.

6 : Créer un job

Dans le Tutoriel 2, vous utilisez l’interface SQL pour créer un service Snowpark Container Services. Dans cette section, vous créez le même job à l’aide de l’application Snowflake Python APIs.

  1. Créez et exécutez la cellule pour obtenir le nom d’hôte de votre registre d’images et l’URL de votre dépôt d’images.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    repo =  schema.image_repositories[repo_name]
    
    repo_url = repo.fetch().repository_url
    print("image registry hostname:", repo_url.split("/")[0])
    print("image repository url:", repo_url + "/")
    
    Copy

    Le code Python récupère l’objet ressource du dépôt d’images (repo), accède à l’objet modèle et en extrait l’URL du dépôt.

  2. Suivez les étapes 1 et 2 du Tutoriel 2 pour importer le code de service, créer une image et la importer dans le dépôt.

  3. Créez et exécutez la cellule pour vérifier que l’image se trouve dans le dépôt.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    for image in repo.list_images_in_repository():
        print(image.image_path)
    
    Copy

    Le code énumère les images à partir de la ressource du dépôt d’images (repo) et imprime l’adresse image_path pour chaque image.

  4. Créez et exécutez la cellule pour créer le job.

    from snowflake.core.service import JobService, ServiceSpec
    
    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    repo = schema.image_repositories[repo_name]
    repo_url = repo.fetch().repository_url
    
    job_name = "test_job"
    
    # cleanup previous job if present.
    schema.services[job_name].drop()(if_exists=True)
    
    specification = f"""
        spec:
          containers:
          - name: main
            image: {repo_url}/my_job_image:latest
            env:
              SNOWFLAKE_WAREHOUSE: {warehouse_name}
            args:
            - "--query=select current_time() as time,'hello'"
            - "--result_table=results"
        """
    job = schema.services.execute_job(JobService(
        name=job_name,
        compute_pool=compute_pool_name,
        spec=ServiceSpec(specification)))
    print("executed job:", job.name, "status:", job.fetch().status)
    
    print("job logs:")
    print(job.get_service_logs(0, "main"))
    
    Copy

    Le job exécute la requête donnée et stocke les résultats dans une table.

  5. Exécutez la cellule suivante pour examiner le résultat inscrit dans la table. Ce code utilise Python Snowpark pour effectuer des requêtes dans cette table.

    session = get_active_session()
    session.use_role(user_role_name)
    # show that above job wrote to results table
    session.sql(f"select * from {database_name}.{schema_name}.results").collect()
    
    Copy

7 : Nettoyer

  1. Arrêtez le service et abandonnez-le. Après avoir abandonné le service, Snowflake suspend par défaut automatiquement le pool de calcul (en supposant qu’il n’y a pas d’autres services et services de job en cours d’exécution). Pour plus d’informations, voir cycle de vie du pool de calcul.

    session = get_active_session()
    session.use_role(user_role_name)
    root = Root(session)
    
    schema = root.databases[database_name].schemas[schema_name]
    
    # now let's clean up
    
    schema.functions["my_echo_function(TEXT)"].drop()
    schema.services[service_name].drop()
    
    Copy
  2. Abandonnez le dépôt d’images pour éviter de payer le stockage. Notez que si vous avez d’autres images stockées dans le dépôt, elles seront supprimées.

    schema.image_repositories[repo_name].drop()
    
    Copy
  3. Détruire le schéma. L’abandon d’un schéma entraîne également l’abandon de tous les objets de ce schéma. Pour ce tutoriel, il s’agit du service, de la fonction, du dépôt d’images et de la zone de préparation que vous avez créée.

    root.databases[database_name].schemas[schema_name].drop()
    
    Copy
  4. Au lieu d’attendre que Snowflake suspende votre pool de calcul, vous pouvez également suspendre explicitement le pool de calcul. Dans ce casse, Snowflake suspend tous les services en cours d’exécution et attend que tous les jobs en cours se terminent, puis suspend le pool de calcul.

    root.compute_pool[compute_pool_name].suspend()
    
    Copy

Quelle est la prochaine étape ?

Ce tutoriel présente l’utilisation de Snowflake Python APIs pour créer et gérer les services et les jobs Snowpark Container Services. Pour plus d’informations sur Snowflake Python APIs, voir Snowflake Python APIs : gestion des objets Snowflake avec Python.