Tutoriel 2 : créer une tâche Snowpark Container Services¶
Important
La fonction de tâches Snowpark Container Services est actuellement disponible en avant-première privée et est soumise aux conditions d’utilisation de l’avant-première disponibles à https://snowflake.com/legal. Contactez votre représentant Snowflake pour plus d’informations.
Introduction¶
Après avoir terminé le tutoriel Configuration commune, vous êtes prêt à créer une tâche. Dans ce tutoriel, vous créez une tâche simple qui se connecte à Snowflake, exécute une requête SQL SELECT et enregistre le résultat dans une table.
Ce tutoriel comporte deux parties :
Partie 1 : Créer et tester une tâche. Vous téléchargez le code fourni pour ce tutoriel et suivez les instructions étape par étape :
Téléchargez le code de la tâche pour ce tutoriel.
Créez une image Docker pour Snowpark Container Services, et chargez l’image dans un référentiel de votre compte.
Mettez en zone de préparation le fichier de spécification de tâche, qui donne à Snowflake les informations de configuration du conteneur. En plus du nom de l’image à utiliser pour démarrer un conteneur, le fichier de spécification fournit :
Trois arguments : une requête SELECT, un entrepôt virtuel pour exécuter la requête et le nom de la table dans laquelle enregistrer le résultat.
L’entrepôt dans lequel l’instruction SELECT doit être exécutée.
Exécutez la tâche. En utilisant la commande EXECUTE SERVICE, vous pouvez exécuter la tâche en fournissant le fichier de spécification et le pool de calcul où Snowflake peut exécuter le conteneur. Enfin, vérifiez les résultats de la tâche.
Partie 2 : comprendre le code de la tâche. Cette section donne une vue d’ensemble du code de la tâche et met en évidence la manière dont les différents éléments collaborent.
1 : Télécharger le code de service¶
Un code (une application Python) est fourni pour créer une tâche.
Téléchargez le fichier zip dans un répertoire.
Décompressez le contenu, qui comprend un répertoire pour chaque tutoriel. Le répertoire
Tutorial-2
contient les fichiers suivants :main.py
Dockerfile
my_job_spec.yaml
2 : Construire et charger une image¶
Construisez une image pour la plateforme linux/amd64 prise en charge par Snowpark Container Services, puis chargez l’image dans le référentiel d’images de votre compte (voir Configuration commune).
Vous aurez besoin d’informations sur le référentiel (l’URL du référentiel et le nom d’hôte du registre) avant de pouvoir construire et charger l’image. Pour plus d’informations, voir Registre et référentiels.
Obtenir des informations sur le référentiel
Pour obtenir l’URL du référentiel, exécutez la commande SQL SHOW IMAGE REPOSITORIES.
SHOW IMAGE REPOSITORIES;
La colonne
repository_url
de la sortie fournit l’URL. En voici un exemple :<orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository
Le nom d’hôte dans l’URL du référentiel est le nom d’hôte du registre. En voici un exemple :
<orgname>-<acctname>.registry.snowflakecomputing.com
Construire l’image et la charger dans le référentiel
Ouvrez une fenêtre de terminal et accédez au répertoire contenant les fichiers que vous avez décompressés.
Pour créer une image Docker, exécutez la commande
docker build
suivante à l’aide de la CLI Docker. Notez que la commande spécifie le répertoire de travail actuel (.) commePATH
pour les fichiers à utiliser pour la construction de l’image.docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> .
Pour
image_name
, utilisezmy_job_image:latest
.
Exemple
docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest .
Chargez l’image dans le référentiel de votre compte Snowflake. Pour que Docker puisse charger une image en votre nom dans votre référentiel, vous devez d’abord authentifier Docker avec Snowflake.
Pour authentifier Docker auprès du registre Snowflake, exécutez la commande suivante.
docker login <registry_hostname> -u <username>
Pour
username
, indiquez votre nom d’utilisateur Snowflake. Docker vous demandera votre mot de passe.
Pour charger l’image, exécutez la commande suivante :
docker push <repository_url>/<image_name>
Exemple
docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest
3 : Mettre en zone de préparation le fichier de spécification¶
Pour charger votre fichier de spécification de tâche (my_job_spec.yaml) sur la zone de préparation, utilisez l’une des options suivantes :
L’interface Web Snowsight : Pour les instructions, voir Sélection d’une zone de préparation interne pour les fichiers locaux.
La CLI SnowSQL : exécutez la commande PUT suivante :
PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Par exemple :
Linux ou macOS
PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Windows
PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
Vous pouvez également spécifier un chemin relatif.
PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE;
La commande définit OVERWRITE=TRUE afin que vous puissiez charger le fichier à nouveau, si nécessaire (par exemple, si vous avez corrigé une erreur dans votre fichier de spécification). Si la commande PUT est exécutée avec succès, les informations relatives au fichier chargé sont affichées.
4 : Exécuter la tâche¶
Vous êtes maintenant prêt à créer une tâche.
Pour démarrer une tâche, exécutez la commande EXECUTE SERVICE :
EXECUTE SERVICE IN COMPUTE POOL tutorial_compute_pool FROM @tutorial_stage SPEC='my_job_spec.yaml';
Remarques :
FROM et SPEC indiquent le nom de la zone de préparation et le nom du fichier de spécification de la tâche. Lorsque la tâche est exécutée, elle exécute l’instruction SQL et enregistre le résultat dans une table comme spécifié dans
my_job_spec.yaml
.L’instruction SQL de votre tâche n’est pas exécutée dans le conteneur Docker. Au lieu de cela, le conteneur en cours d’exécution se connecte à Snowflake et exécute l’instruction SQL dans un entrepôt Snowflake.
COMPUTE_POOL fournit les ressources informatiques où Snowflake exécute la tâche.
EXECUTE SERVICE renvoie une sortie qui inclut le UUID de la tâche attribué par Snowflake, comme le montre l’exemple de sortie suivant :
+------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job 01af7ee6-0001-cb52-0020-c5870077223a completed successfully with status: DONE. | +------------------------------------------------------------------------------------+
Obtenez l’ID de la requête que vous avez exécutée (EXECUTE SERVICE est une requête).
SET jobid = LAST_QUERY_ID();
Vous utilisez l’ID dans les étapes suivantes pour récupérer le statut du travail et les informations du journal des tâches.
Note
Il est important d’appeler LAST_QUERY_ID immédiatement après avoir appelé EXECUTE SERVICE pour s’assurer que l’ID de la tâche renvoyé par la commande est destiné à la commande EXECUTE SERVICE.
LAST_QUERY_ID ne renvoie l’ID de requête d’une tâche qu’une fois la tâche terminée ; pour les tâches en cours de longue durée, cela ne convient pas pour obtenir des informations sur le statut en temps réel, et vous devriez plutôt utiliser la famille QUERY HISTORY de fonctions de table pour obtenir l’ID de requête de la tâche. Pour plus d’informations, voir Utilisation des tâches.
Cette tâche exécute une requête simple et enregistre les résultats dans la table des résultats. Vous pouvez vérifier que la tâche s’est bien déroulée en interrogeant la table des résultats :
SELECT * FROM results;
Exemple de sortie :
+----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+
Si vous souhaitez déboguer l’exécution de votre tâche, utilisez les fonctions du système. Par exemple, utilisez SYSTEM$GET_JOB_STATUS pour déterminer si la tâche est toujours en cours d’exécution, si elle n’a pas démarré ou, le cas échéant, pourquoi elle a échoué. De plus, si votre code produit des journaux utiles sur la sortie standard ou l’erreur standard, vous pouvez accéder aux journaux en utilisant SYSTEM$GET_JOB_LOGS.
Pour obtenir le statut d’une tâche, appelez la fonction système SYSTEM$GET_JOB_STATUS :
SELECT SYSTEM$GET_JOB_STATUS($jobid);
Exemple de sortie :
[ { "status":"DONE", "message":"Completed successfully", "containerName":"main", "instanceId":"0", "serviceName":"01af7ee6-0001-cb52-0020-c5870077223a", "image":"orgname-acctname.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest", "restartCount":0, "startTime":"" } ]
Dans la sortie, comme la tâche n’a pas de nom, le
serviceName
est le UUID attribué par Snowflake (ID de requête) de la tâche.Pour obtenir les informations du journal des tâches, utilisez la fonction système SYSTEM$GET_JOB_LOGS :
SELECT SYSTEM$GET_JOB_LOGS($jobid, 'main')
job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished
5 : Nettoyage¶
Si vous ne prévoyez pas de poursuivre le tutoriel 3, vous devez supprimer les ressources facturables que vous avez créées. Pour plus d’informations, voir l’étape 5 dans le tutoriel 3.
6 : Vérification du code de tâche¶
Cette section couvre les sujets suivants :
Vérification des fichiers fournis : examinez les différents fichiers de code qui mettent en œuvre la tâche.
Construire et tester une image localement. Cette section explique comment vous pouvez tester localement l’image Docker avant de la charger vers un référentiel dans votre compte Snowflake.
Vérification des fichiers fournis¶
Le fichier zip que vous avez téléchargé au début du tutoriel comprend les fichiers suivants :
main.py
Dockerfile
my_job_spec.yaml
Cette section donne un aperçu de la manière dont le code met en œuvre la tâche.
fichier main.py¶
#!/opt/conda/bin/python3
import argparse
import logging
import os
import sys
from snowflake.snowpark import Session
from snowflake.snowpark.exceptions import *
# Environment variables below will be automatically populated by Snowflake.
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA")
# Custom environment variables
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
def get_arg_parser():
"""
Input argument list.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--query", required=True, help="query text to execute")
parser.add_argument(
"--result_table",
required=True,
help="name of the table to store result of query specified by flag --query")
return parser
def get_logger():
"""
Get a logger for local logging.
"""
logger = logging.getLogger("job-tutorial")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_login_token():
"""
Read the login token supplied automatically by Snowflake. These tokens
are short lived and should always be read right before creating any new connection.
"""
with open("/snowflake/session/token", "r") as f:
return f.read()
def get_connection_params():
"""
Construct Snowflake connection params from environment variables.
"""
if os.path.exists("/snowflake/session/token"):
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"authenticator": "oauth",
"token": get_login_token(),
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
else:
return {
"account": SNOWFLAKE_ACCOUNT,
"host": SNOWFLAKE_HOST,
"user": SNOWFLAKE_USER,
"password": SNOWFLAKE_PASSWORD,
"role": SNOWFLAKE_ROLE,
"warehouse": SNOWFLAKE_WAREHOUSE,
"database": SNOWFLAKE_DATABASE,
"schema": SNOWFLAKE_SCHEMA
}
def run_job():
"""
Main body of this job.
"""
logger = get_logger()
logger.info("Job started")
# Parse input arguments
args = get_arg_parser().parse_args()
query = args.query
result_table = args.result_table
# Start a Snowflake session, run the query and write results to specified table
with Session.builder.configs(get_connection_params()).create() as session:
# Print out current session context information.
database = session.get_current_database()
schema = session.get_current_schema()
warehouse = session.get_current_warehouse()
role = session.get_current_role()
logger.info(
f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}"
)
# Execute query and persist results in a table.
logger.info(
f"Executing query [{query}] and writing result to table [{result_table}]"
)
res = session.sql(query)
# If the table already exists, the query result must match the table scheme.
# If the table does not exist, this will create a new table.
res.write.mode("append").save_as_table(result_table)
logger.info("Job finished")
if __name__ == "__main__":
run_job()
Dans le code :
Le code Python s’exécute à
main
, qui exécute ensuite la fonctionrun_job()
:if __name__ == "__main__": run_job()
La fonction
run_job()
lit les variables d’environnement et les utilise pour définir les valeurs par défaut de divers paramètres. Le conteneur utilise ces paramètres pour se connecter à Snowflake. Remarques importantes :Vous pouvez remplacer ces valeurs de paramètres par défaut. Pour plus d’informations, voir Référence Spécification de service.
Lorsque l’image est exécutée dans Snowflake, Snowflake renseigne automatiquement certains de ces paramètres (voir le code source). Cependant, lorsque vous testez l’image localement, vous devez fournir explicitement ces paramètres (comme indiqué dans la section suivante, Construire et tester une image localement).
Dockerfile¶
Ce fichier contient toutes les commandes pour construire une image en utilisant Docker.
ARG BASE_IMAGE=continuumio/miniconda3:4.12.0
FROM $BASE_IMAGE
RUN conda install python=3.8 && \
conda install snowflake-snowpark-python
COPY main.py ./
ENTRYPOINT ["python3", "main.py"]
Fichier my_job_spec.yaml (Spécification de la tâche)¶
Snowflake utilise les informations que vous fournissez dans cette spécification pour configurer et exécuter votre tâche.
spec:
container:
- name: main
image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest
env:
SNOWFLAKE_WAREHOUSE: tutorial_warehouse
args:
- "--query=select current_time() as time,'hello'"
- "--result_table=results"
Outre les champs obligatoires container.name
et container.image
(voir Référence Spécification de service), la spécification inclut le champ facultatif container.args
pour répertorier les arguments :
--query
fournit la requête à exécuter lors de l’exécution de la tâche.--result_table
identifie la table dans laquelle seront enregistrés les résultats de la requête.
Construire et tester une image localement¶
Vous pouvez tester l’image Docker localement avant de la charger vers un référentiel dans votre compte Snowflake. Dans les tests locaux, votre conteneur fonctionne de manière autonome (il ne s’agit pas d’une tâche exécutée par Snowflake).
Suivez les étapes suivantes pour tester l’image Docker du tutoriel 2 :
Pour créer une image Docker, dans la CLI Docker, exécutez la commande
docker build
:docker build --rm -t my_service:local .
Pour lancer votre code, exécutez la commande
docker run
, en fournissant<orgname>-<acctname>
,<username>
, et<password>
:docker run --rm \ -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \ -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \ -e SNOWFLAKE_DATABASE=tutorial_db \ -e SNOWFLAKE_SCHEMA=data_schema \ -e SNOWFLAKE_ROLE=test_role \ -e SNOWFLAKE_USER=<username> \ -e SNOWFLAKE_PASSWORD=<password> \ -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \ my_job:local \ --query="select current_time() as time,'hello'" \ --result_table=tutorial_db.data_schema.results
Lorsque vous testez l’image localement, notez qu’en plus des trois arguments (une requête, l’entrepôt pour exécuter la requête et une table pour enregistrer le résultat), vous fournissez également les paramètres de connexion pour que le conteneur exécuté localement se connecte à Snowflake.
Lorsque vous exécutez le conteneur en tant que tâche, Snowflake fournit ces paramètres au conteneur en tant que variables d’environnement. Pour plus d’informations, voir Configurer le client Snowflake.
La tâche exécute la requête (
select current_time() as time,'hello'
) et écrit le résultat dans la table (tutorial_db.data_schema.results
). Si la table n’existe pas, elle est créée. Si la table existe, la tâche ajoute une ligne.Exemple de résultat de l’interrogation de la table de résultats :
+----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+
Quelle est la prochaine étape ?¶
Vous pouvez maintenant tester le tutoriel 3, qui montre comment fonctionne la communication de service à service.