Tutoriel 2 : créer une tâche Snowpark Container Services

Important

La fonction de tâches Snowpark Container Services est actuellement disponible en avant-première privée et est soumise aux conditions d’utilisation de l’avant-première disponibles à https://snowflake.com/legal. Contactez votre représentant Snowflake pour plus d’informations.

Introduction

Après avoir terminé le tutoriel Configuration commune, vous êtes prêt à créer une tâche. Dans ce tutoriel, vous créez une tâche simple qui se connecte à Snowflake, exécute une requête SQL SELECT et enregistre le résultat dans une table.

Ce tutoriel comporte deux parties :

Partie 1 : Créer et tester une tâche. Vous téléchargez le code fourni pour ce tutoriel et suivez les instructions étape par étape :

  1. Téléchargez le code de la tâche pour ce tutoriel.

  2. Créez une image Docker pour Snowpark Container Services, et chargez l’image dans un référentiel de votre compte.

  3. Mettez en zone de préparation le fichier de spécification de tâche, qui donne à Snowflake les informations de configuration du conteneur. En plus du nom de l’image à utiliser pour démarrer un conteneur, le fichier de spécification fournit :

    • Trois arguments : une requête SELECT, un entrepôt virtuel pour exécuter la requête et le nom de la table dans laquelle enregistrer le résultat.

    • L’entrepôt dans lequel l’instruction SELECT doit être exécutée.

  4. Exécutez la tâche. En utilisant la commande EXECUTE SERVICE, vous pouvez exécuter la tâche en fournissant le fichier de spécification et le pool de calcul où Snowflake peut exécuter le conteneur. Enfin, vérifiez les résultats de la tâche.

Partie 2 : comprendre le code de la tâche. Cette section donne une vue d’ensemble du code de la tâche et met en évidence la manière dont les différents éléments collaborent.

1 : Télécharger le code de service

Un code (une application Python) est fourni pour créer une tâche.

  1. Téléchargez le fichier zip dans un répertoire.

  2. Décompressez le contenu, qui comprend un répertoire pour chaque tutoriel. Le répertoire Tutorial-2 contient les fichiers suivants :

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2 : Construire et charger une image

Construisez une image pour la plateforme linux/amd64 prise en charge par Snowpark Container Services, puis chargez l’image dans le référentiel d’images de votre compte (voir Configuration commune).

Vous aurez besoin d’informations sur le référentiel (l’URL du référentiel et le nom d’hôte du registre) avant de pouvoir construire et charger l’image. Pour plus d’informations, voir Registre et référentiels.

Obtenir des informations sur le référentiel

  1. Pour obtenir l’URL du référentiel, exécutez la commande SQL SHOW IMAGE REPOSITORIES.

    SHOW IMAGE REPOSITORIES;
    
    Copy
    • La colonne repository_url de la sortie fournit l’URL. En voici un exemple :

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
      
    • Le nom d’hôte dans l’URL du référentiel est le nom d’hôte du registre. En voici un exemple :

      <orgname>-<acctname>.registry.snowflakecomputing.com
      

Construire l’image et la charger dans le référentiel

  1. Ouvrez une fenêtre de terminal et accédez au répertoire contenant les fichiers que vous avez décompressés.

  2. Pour créer une image Docker, exécutez la commande docker build suivante à l’aide de la CLI Docker. Notez que la commande spécifie le répertoire de travail actuel (.) comme PATH pour les fichiers à utiliser pour la construction de l’image.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
    
    Copy
    • Pour image_name, utilisez my_job_image:latest.

    Exemple

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
    
    Copy
  3. Chargez l’image dans le référentiel de votre compte Snowflake. Pour que Docker puisse charger une image en votre nom dans votre référentiel, vous devez d’abord authentifier Docker avec Snowflake.

    1. Pour authentifier Docker auprès du registre Snowflake, exécutez la commande suivante.

      docker login <registry_hostname> -u <username>
      
      Copy
      • Pour username, indiquez votre nom d’utilisateur Snowflake. Docker vous demandera votre mot de passe.

    2. Pour charger l’image, exécutez la commande suivante :

      docker push <repository_url>/<image_name>
      
      Copy

      Exemple

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
      
      Copy

3 : Mettre en zone de préparation le fichier de spécification

  • Pour charger votre fichier de spécification de tâche (my_job_spec.yaml) sur la zone de préparation, utilisez l’une des options suivantes :

    • L’interface Web Snowsight : Pour les instructions, voir Choix d’une zone de préparation interne pour les fichiers locaux.

    • La CLI SnowSQL : exécutez la commande PUT suivante :

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      Par exemple :

      • Linux ou macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage
          AUTO_COMPRESS=FALSE
          OVERWRITE=TRUE;
        
        Copy

      Vous pouvez également spécifier un chemin relatif.

      PUT file://./my_job_spec.yaml @tutorial_stage
        AUTO_COMPRESS=FALSE
        OVERWRITE=TRUE;
      
      Copy

      La commande définit OVERWRITE=TRUE afin que vous puissiez charger le fichier à nouveau, si nécessaire (par exemple, si vous avez corrigé une erreur dans votre fichier de spécification). Si la commande PUT est exécutée avec succès, les informations relatives au fichier chargé sont affichées.

4 : Exécuter la tâche

Vous êtes maintenant prêt à créer une tâche.

  1. Pour démarrer une tâche, exécutez la commande EXECUTE SERVICE :

    EXECUTE SERVICE
      IN COMPUTE POOL tutorial_compute_pool
      FROM @tutorial_stage
      SPEC='my_job_spec.yaml';
    
    Copy

    Remarques :

    • FROM et SPEC indiquent le nom de la zone de préparation et le nom du fichier de spécification de la tâche. Lorsque la tâche est exécutée, elle exécute l’instruction SQL et enregistre le résultat dans une table comme spécifié dans my_job_spec.yaml.

      L’instruction SQL de votre tâche n’est pas exécutée dans le conteneur Docker. Au lieu de cela, le conteneur en cours d’exécution se connecte à Snowflake et exécute l’instruction SQL dans un entrepôt Snowflake.

    • COMPUTE_POOL fournit les ressources informatiques où Snowflake exécute la tâche.

    • EXECUTE SERVICE renvoie une sortie qui inclut le UUID de la tâche attribué par Snowflake, comme le montre l’exemple de sortie suivant :

      +------------------------------------------------------------------------------------+
      |                      status                                                        |
      -------------------------------------------------------------------------------------+
      | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. |
      +------------------------------------------------------------------------------------+
      
  2. Obtenez l’ID de la requête que vous avez exécutée (EXECUTE SERVICE est une requête).

    SET jobid = LAST_QUERY_ID();
    
    Copy

    Vous utilisez l’ID dans les étapes suivantes pour récupérer le statut du travail et les informations du journal des tâches.

    Note

    Il est important d’appeler LAST_QUERY_ID immédiatement après avoir appelé EXECUTE SERVICE pour s’assurer que l’ID de la tâche renvoyé par la commande est destiné à la commande EXECUTE SERVICE.

    LAST_QUERY_ID ne renvoie l’ID de requête d’une tâche qu’une fois la tâche terminée ; pour les tâches en cours de longue durée, cela ne convient pas pour obtenir des informations sur le statut en temps réel, et vous devriez plutôt utiliser la famille QUERY HISTORY de fonctions de table pour obtenir l’ID de requête de la tâche. Pour plus d’informations, voir Utilisation des tâches.

  3. Cette tâche exécute une requête simple et enregistre les résultats dans la table des résultats. Vous pouvez vérifier que la tâche s’est bien déroulée en interrogeant la table des résultats :

    SELECT * FROM results;
    
    Copy

    Exemple de sortie :

    +----------+-----------+
    | TIME     | TEXT      |
    |----------+-----------|
    | 10:56:52 | hello     |
    +----------+-----------+
    
  4. Si vous souhaitez déboguer l’exécution de votre tâche, utilisez les fonctions du système. Par exemple, utilisez SYSTEM$GET_JOB_STATUS pour déterminer si la tâche est toujours en cours d’exécution, si elle n’a pas démarré ou, le cas échéant, pourquoi elle a échoué. De plus, si votre code produit des journaux utiles sur la sortie standard ou l’erreur standard, vous pouvez accéder aux journaux en utilisant SYSTEM$GET_JOB_LOGS.

    1. Pour obtenir le statut d’une tâche, appelez la fonction système SYSTEM$GET_JOB_STATUS :

      SELECT SYSTEM$GET_JOB_STATUS($jobid);
      
      Copy

      Exemple de sortie :

      [
        {
            "status":"DONE",
            "message":"Completed successfully",
            "containerName":"main",
            "instanceId":"0",
            "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a",
            "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest",
            "restartCount":0,
            "startTime":""
        }
      ]
      
      Copy

      Dans la sortie, comme la tâche n’a pas de nom, le serviceName est le UUID attribué par Snowflake (ID de requête) de la tâche.

    2. Pour obtenir les informations du journal des tâches, utilisez la fonction système SYSTEM$GET_JOB_LOGS :

      SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
      
      Copy
      job-tutorial - INFO - Job started
      job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE"
      job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results]
      job-tutorial - INFO - Job finished
      

5 : Nettoyage

Si vous ne prévoyez pas de poursuivre le tutoriel 3, vous devez supprimer les ressources facturables que vous avez créées. Pour plus d’informations, voir l’étape 5 dans le tutoriel 3.

6 : Vérification du code de tâche

Cette section couvre les sujets suivants :

Vérification des fichiers fournis

Le fichier zip que vous avez téléchargé au début du tutoriel comprend les fichiers suivants :

  • main.py

  • Dockerfile

  • my_job_spec.yaml

Cette section donne un aperçu de la manière dont le code met en œuvre la tâche.

fichier main.py

#!/opt/conda/bin/python3

import argparse
import logging
import os
import sys

from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *

# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")

# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")


def get_arg_parser():
  """
  Input argument list.
  """
  parser = argparse.ArgumentParser()
  parser.add_argument("--query", required=True, help="query text to execute")
  parser.add_argument(
    "--result_table",
    required=True,
    help="name of the table to store result of query specified by flag --query")

  return parser


def get_logger():
  """
  Get a logger for local logging.
  """
  logger = logging.getLogger("job-tutorial")
  logger.setLevel(logging.DEBUG)
  handler = logging.StreamHandler(sys.stdout)
  handler.setLevel(logging.DEBUG)
  formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)
  return logger


def get_login_token():
  """
  Read the login token supplied automatically by Snowflake. These tokens
  are short lived and should always be read right before creating any new connection.
  """
  with open("/snowflake/session/token", "r") as f:
    return f.read()


def get_connection_params():
  """
  Construct Snowflake connection params from environment variables.
  """
  if os.path.exists("/snowflake/session/token"):
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "authenticator": "oauth",
      "token": get_login_token(),
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }
  else:
    return {
      "account": SNOWFLAKE_ACCOUNT,
      "host": SNOWFLAKE_HOST,
      "user": SNOWFLAKE_USER,
      "password": SNOWFLAKE_PASSWORD,
      "role": SNOWFLAKE_ROLE,
      "warehouse": SNOWFLAKE_WAREHOUSE,
      "database": SNOWFLAKE_DATABASE,
      "schema": SNOWFLAKE_SCHEMA
    }

def run_job():
  """
  Main body of this job.
  """
  logger = get_logger()
  logger.info("Job started")

  # Parse input arguments
  args = get_arg_parser().parse_args()
  query = args.query
  result_table = args.result_table

  # Start a Snowflake session, run the query and write results to specified table
  with Session.builder.configs(get_connection_params()).create() as session:
    # Print out current session context information.
    database = session.get_current_database()
    schema = session.get_current_schema()
    warehouse = session.get_current_warehouse()
    role = session.get_current_role()
    logger.info(
      f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
    )

    # Execute query and persist results in a table.
    logger.info(
      f"Executing query [{query}] and writing result to table [{result_table}]"
    )
    res = session.sql(query)
    # If the table already exists, the query result must match the table scheme.
    # If the table does not exist, this will create a new table.
    res.write.mode("append").save_as_table(result_table)

  logger.info("Job finished")


if __name__ == "__main__":
  run_job()
Copy

Dans le code :

  • Le code Python s’exécute à main, qui exécute ensuite la fonction run_job() :

    if __name__ == "__main__":
      run_job()
    
    Copy
  • La fonction run_job() lit les variables d’environnement et les utilise pour définir les valeurs par défaut de divers paramètres. Le conteneur utilise ces paramètres pour se connecter à Snowflake. Remarques importantes :

    • Vous pouvez remplacer ces valeurs de paramètres par défaut. Pour plus d’informations, voir Référence Spécification de service.

    • Lorsque l’image est exécutée dans Snowflake, Snowflake renseigne automatiquement certains de ces paramètres (voir le code source). Cependant, lorsque vous testez l’image localement, vous devez fournir explicitement ces paramètres (comme indiqué dans la section suivante, Construire et tester une image localement).

Dockerfile

Ce fichier contient toutes les commandes pour construire une image en utilisant Docker.

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
  conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Copy

Fichier my_job_spec.yaml (Spécification de la tâche)

Snowflake utilise les informations que vous fournissez dans cette spécification pour configurer et exécuter votre tâche.

spec:
container:
- name: main
   image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
   env:
      SNOWFLAKE_WAREHOUSE: tutorial_warehouse
   args:
   - "--query=select current_time() as time,'hello'"
   - "--result_table=results"
Copy

Outre les champs obligatoires container.name et container.image (voir Référence Spécification de service), la spécification inclut le champ facultatif container.args pour répertorier les arguments :

  • --query fournit la requête à exécuter lors de l’exécution de la tâche.

  • --result_table identifie la table dans laquelle seront enregistrés les résultats de la requête.

Construire et tester une image localement

Vous pouvez tester l’image Docker localement avant de la charger vers un référentiel dans votre compte Snowflake. Dans les tests locaux, votre conteneur fonctionne de manière autonome (il ne s’agit pas d’une tâche exécutée par Snowflake).

Suivez les étapes suivantes pour tester l’image Docker du tutoriel 2 :

  1. Pour créer une image Docker, dans la CLI Docker, exécutez la commande docker build :

    docker build --rm -t my_service:local .
    
    Copy
  2. Pour lancer votre code, exécutez la commande docker run, en fournissant <orgname>-<acctname>, <username>, et <password> :

    docker run --rm \
      -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \
      -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \
      -e SNOWFLAKE_DATABASE=tutorial_db \
      -e SNOWFLAKE_SCHEMA=data_schema \
      -e SNOWFLAKE_ROLE=test_role \
      -e SNOWFLAKE_USER=<username> \
      -e SNOWFLAKE_PASSWORD=<password> \
      -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \
      my_job:local \
      --query="select current_time() as time,'hello'" \
      --result_table=tutorial_db.data_schema.results
    
    Copy

    Lorsque vous testez l’image localement, notez qu’en plus des trois arguments (une requête, l’entrepôt pour exécuter la requête et une table pour enregistrer le résultat), vous fournissez également les paramètres de connexion pour que le conteneur exécuté localement se connecte à Snowflake.

    Lorsque vous exécutez le conteneur en tant que tâche, Snowflake fournit ces paramètres au conteneur en tant que variables d’environnement. Pour plus d’informations, voir Configurer le client Snowflake.

    La tâche exécute la requête (select current_time() as time,'hello') et écrit le résultat dans la table (tutorial_db.data_schema.results). Si la table n’existe pas, elle est créée. Si la table existe, la tâche ajoute une ligne.

    Exemple de résultat de l’interrogation de la table de résultats :

    +----------+----------+
    | TIME     | TEXT     |
    |----------+----------|
    | 10:56:52 | hello    |
    +----------+----------+
    

Quelle est la prochaine étape ?

Vous pouvez maintenant tester le tutoriel 3, qui montre comment fonctionne la communication de service à service.