Tutoriel 2 : créer un service de tâche Snowpark Container Services

Introduction

Après avoir terminé le tutoriel Configuration commune, vous êtes prêt à créer un service de tâche. Dans ce tutoriel, vous créez un service de tâche simple qui se connecte à Snowflake, exécute une requête SQL SELECT et enregistre le résultat dans une table.

Ce tutoriel comporte deux parties :

Partie 1 : créez et testez un service de tâche. Vous téléchargez le code fourni pour ce tutoriel et suivez les instructions étape par étape :

  1. Téléchargez le code de service de tâche pour ce tutoriel.

  2. Créez une image Docker pour Snowpark Container Services, et chargez l’image dans un référentiel de votre compte.

  3. Mettez en zone de préparation le fichier de spécification du service, qui donne à Snowflake les informations de configuration du conteneur. En plus du nom de l’image à utiliser pour démarrer un conteneur, le fichier de spécification spécifie trois arguments : une requête SELECT, un entrepôt virtuel pour exécuter la requête et le nom de la table dans laquelle enregistrer le résultat.

  4. Exécutez le service de tâche. En utilisant la commande EXECUTE JOB SERVICE, vous pouvez exécuter le service de tâche en fournissant le fichier de spécification et le pool de calcul où Snowflake peut exécuter le conteneur. Enfin, vérifiez les résultats du service.

Partie 2 : comprendre le code du service de tâche. Cette section donne une vue d’ensemble du code de service de tâche et met en évidence la façon dont les différentes composantes collaborent.

1 : Téléchargez le code du service de tâche

Un code (une application Python) est fourni pour créer un service de tâche.

  1. Téléchargez SnowparkContainerServices-Tutorials.zip.

  2. Décompressez le contenu, qui comprend un répertoire pour chaque tutoriel. Le répertoire Tutorial-2 contient les fichiers suivants :

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2 : Construire et charger une image

Construisez une image pour la plateforme linux/amd64 prise en charge par Snowpark Container Services, puis chargez l’image dans le référentiel d’images de votre compte (voir Configuration commune).

Vous aurez besoin d’informations sur le référentiel (l’URL du référentiel et le nom d’hôte du registre) avant de pouvoir construire et charger l’image. Pour plus d’informations, voir Registre et référentiels.

Obtenir des informations sur le référentiel

  1. Pour obtenir l’URL du référentiel, exécutez la commande SQL SHOW IMAGE REPOSITORIES.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • La colonne repository_url de la sortie fournit l’URL. En voici un exemple :

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • Le nom d’hôte dans l’URL du référentiel est le nom d’hôte du registre. En voici un exemple :

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Construire l’image et la charger dans le référentiel

  1. Ouvrez une fenêtre de terminal et accédez au répertoire contenant les fichiers que vous avez décompressés.

  2. Pour créer une image Docker, exécutez la commande docker build suivante à l’aide de la CLI Docker. Notez que la commande spécifie le répertoire de travail actuel (.) comme PATH pour les fichiers à utiliser pour la construction de l’image.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Pour image_name, utilisez my_job_image:latest.

    Exemple

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Chargez l’image dans le référentiel de votre compte Snowflake. Pour que Docker puisse charger une image en votre nom dans votre référentiel, vous devez d’abord authentifier Docker avec le registre.

    1. Pour authentifier Docker auprès du registre Snowflake, exécutez la commande suivante.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Pour username, indiquez votre nom d’utilisateur Snowflake. Docker vous demandera votre mot de passe.

    2. Pour charger l’image, exécutez la commande suivante :

      docker push <repository_url>/<image_name>
      
      Copy

      Exemple

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3 : Mettre en zone de préparation le fichier de spécification

  • Pour charger votre fichier de spécification de service (my_job_spec.yaml) sur la zone de préparation, utilisez l’une des options suivantes :

    • L’interface Web Snowsight : Pour les instructions, voir Sélection d’une zone de préparation interne pour les fichiers locaux.

    • La CLI SnowSQL : exécutez la commande PUT suivante :

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Par exemple :

      • Linux ou macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Vous pouvez également spécifier un chemin relatif.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      La commande définit OVERWRITE=TRUE afin que vous puissiez charger le fichier à nouveau, si nécessaire (par exemple, si vous avez corrigé une erreur dans votre fichier de spécification). Si la commande PUT est exécutée avec succès, les informations relatives au fichier chargé sont affichées.

4 : Exécutez le service de tâche

Vous êtes maintenant prêt à créer une tâche.

  1. Pour démarrer un service de tâche, exécutez la commande EXECUTE JOBSERVICE :

    EXECUTE JOB SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      NAME=tutorial_2_job_service
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Remarques :

    • FROM et SPEC indiquent le nom de la zone de préparation et le nom du fichier de spécification du service de tâche. Lorsque le service de tâche est exécuté, il exécute l’instruction SQL et enregistre le résultat dans une table comme spécifié dans my_job_spec.yaml.

      L’instruction SQL n’est pas exécutée dans le conteneur Docker. Au lieu de cela, le conteneur en cours d’exécution se connecte à Snowflake et exécute l’instruction SQL dans un entrepôt Snowflake.

    • COMPUTE_POOL fournit les ressources de calcul où Snowflake exécute le service de tâche.

    • EXECUTE JOB SERVICE renvoie une sortie qui inclut le nom de la tâche, comme le montre l’exemple de sortie suivant :

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE.               |
      +------------------------------------------------------------------------------------+
      
  2. Ce service de tâche exécute une requête simple et enregistre les résultats dans la table des résultats. Vous pouvez vérifier que le service de tâche s’est bien déroulé en interrogeant la table des résultats :

    SELECT * FROM results;
    
    Copy

    Exemple de sortie :

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  3. Si vous souhaitez déboguer l’exécution de votre service de tâche, utilisez les fonctions du système. Par exemple, utilisez SYSTEM$GET_SERVICE_STATUS pour déterminer si le service de tâche est toujours en cours d’exécution, si il n’a pas démarré ou, le cas échéant, pourquoi il a échoué. De plus, si votre code produit des journaux utiles sur la sortie standard ou l’erreur standard, vous pouvez accéder aux journaux en utilisant SYSTEM$GET_SERVICE_LOGS.

    1. Pour obtenir le statut d’un service de tâche, appelez la fonction système SYSTEM$GET_SERVICE_STATUS — Obsolète :

      SELECT SYSTEM$GET_SERVICE_STATUS('tutorial_2_job_service');
      
      Copy

      Exemple de sortie :

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"TUTORIAL_2_JOB_SERVICE",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy
    2. Pour obtenir les informations du journal des services de tâches, utilisez la fonction système SYSTEM$GET_SERVICE_LOGS :

      SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5 : Nettoyage

Si vous ne prévoyez pas de poursuivre le tutoriel 3, vous devez supprimer les ressources facturables que vous avez créées. Pour plus d’informations, voir l’étape 5 dans le tutoriel 3.

6 : Révisez le code du service de tâche

Cette section couvre les sujets suivants :

Vérification des fichiers fournis

Le fichier zip que vous avez téléchargé au début du tutoriel comprend les fichiers suivants :

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Cette section donne un aperçu du code.

fichier main.py

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

Dans le code :

  • Le code Python s’exécute à main, qui exécute ensuite la fonction run_job() :

    if __name__ == "__main__":
      run_job()
    
    Copy
  • La fonction run_job() lit les variables d’environnement et les utilise pour définir les valeurs par défaut de divers paramètres. Le conteneur utilise ces paramètres pour se connecter à Snowflake. Remarques importantes :

    • Vous pouvez remplacer les valeurs des paramètres utilisés dans le service en utilisant les champs containers.env et containers.args dans la spécification du service. Pour plus d’informations, voir Référence Spécification de service.

    • Lorsque l’image est exécutée dans Snowflake, Snowflake renseigne automatiquement certains de ces paramètres (voir le code source). Cependant, lorsque vous testez l’image localement, vous devez fournir explicitement ces paramètres (comme indiqué dans la section suivante, Construire et tester une image localement).

Dockerfile

Ce fichier contient toutes les commandes pour construire une image en utilisant Docker.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

fichier my_job_spec.yaml (Spécification de service)

Snowflake utilise les informations que vous fournissez dans cette spécification pour configurer et exécuter votre service de tâche.

spec:
  containers:
  - name: main
    image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
    env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
    args:
    - "--query=select current_time() as time,'hello'"
    - "--result_table=results"
Copy

Outre les champs obligatoires container.name et container.image (voir Référence Spécification de service), la spécification inclut le champ facultatif container.args pour répertorier les arguments :

  • --query fournit la requête à exécuter lors de l’exécution du service.

  • --result_table identifie la table dans laquelle seront enregistrés les résultats de la requête.

Construire et tester une image localement

Vous pouvez tester l’image Docker localement avant de la charger vers un référentiel dans votre compte Snowflake. Dans les tests locaux, votre conteneur fonctionne de manière autonome (il ne s’agit pas d’un service de tâche exécuté par Snowflake).

Suivez les étapes suivantes pour tester l’image Docker du tutoriel 2 :

  1. Pour créer une image Docker, dans la CLI Docker, exécutez la commande docker build :

    docker build --rm -t my_service:local .
    
    Copy
  2. Pour lancer votre code, exécutez la commande docker run, en fournissant <orgname>-<acctname>, <username>, et <password> :

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Lorsque vous testez l’image localement, notez qu’en plus des trois arguments (une requête, l’entrepôt pour exécuter la requête et une table pour enregistrer le résultat), vous fournissez également les paramètres de connexion pour que le conteneur exécuté localement se connecte à Snowflake.

    Lorsque vous exécutez le conteneur en tant que service, Snowflake fournit ces paramètres au conteneur en tant que variables d’environnement. Pour plus d’informations, voir Configurer le client Snowflake.

    Le service de tâche exécute la requête (select current_time() as time,'hello') et écrit le résultat dans la table (tutorial_db.data_schema.results). Si la table n’existe pas, elle est créée. Si la table existe, le service de tâche ajoute une ligne.

    Exemple de résultat de l’interrogation de la table de résultats :

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Quelle est la prochaine étape ?

Vous pouvez maintenant tester le tutoriel 3, qui montre comment fonctionne la communication de service à service.