Créer une séquence de tâches à l’aide d’un graphique de tâches¶
Dans Snowflake, vous pouvez gérer plusieurs tâches à l’aide d’un graphique de tâches, également appelé graphe orienté acyclique, ou DAG. Un graphique de tâches est composé d’une tâche racine et de tâches enfants dépendantes. Les dépendances doivent être exécutées du début à la fin, sans boucle. Une tâche finale facultative (ou tâche de finalisation) peut effectuer des opérations de nettoyage une fois que toutes les autres tâches sont terminées.
Vous pouvez créer des graphiques de tâches à comportement dynamique en spécifiant des opérations logiques dans le corps de la tâche à l’aide des valeurs d’exécution, de la configuration au niveau du graphique et des valeurs de retour des tâches parentes.
Vous pouvez créer des tâches et des graphiques de tâches en utilisant des langages et des outils pris en charge comme SQL, JavaScript, Python, Java, Scala ou Snowflake Scripting. Cette rubrique fournit des exemples en SQL. Pour des exemples en Python, voir Gestion des tâches et des graphiques de tâches Snowflake avec Python.
Créer un graphique de tâches¶
Créez une tâche racine à l’aide de CREATE TASK, puis créez des tâches enfants à l’aide de CREATE TASK .. AFTER pour sélectionner les tâches parentes.
La tâche racine détermine à quel moment le graphique de tâches s’exécute. Les tâches enfants sont exécutées dans l’ordre défini par le graphique de tâches.
Lorsque plusieurs tâches enfants ont le même parent, elles s’exécutent en parallèle.
Lorsqu’une tâche a plusieurs parents, elle attend que toutes les tâches précédentes aient été menées à bien avant de commencer. (La tâche peut également être exécutée lorsque certaines tâches parentes sont ignorées. Pour plus d’informations, voir Sauter ou suspendre une tâche enfant.)
L’exemple suivant crée un graphique de tâches sans serveur qui commence par une tâche racine planifiée pour s’exécuter toutes les minutes. La tâche racine a deux tâches enfants qui s’exécutent en parallèle. (Le diagramme montre un exemple où l’une de ces tâches met plus de temps à s’exécuter que l’autre.) Une fois ces deux tâches terminées, une troisième tâche enfant est exécutée. La tâche de finalisation s’exécute après l’achèvement ou l’échec de toutes les autres tâches :
CREATE TASK task_root
SCHEDULE = '1 MINUTE'
AS SELECT 1;
CREATE TASK task_a
AFTER task_root
AS SELECT 1;
CREATE TASK task_b
AFTER task_root
AS SELECT 1;
CREATE TASK task_c
AFTER task_a, task_b
AS SELECT 1;
Considérations :
Un graphique de tâches est limité à un maximum de 1 000 tâches.
Une tâche unique peut avoir un maximum de 100 tâches parents et 100 tâches enfants.
Lorsque des tâches sont exécutées en parallèle sur le même entrepôt géré par l’utilisateur, les ressources de calcul doivent être dimensionnées pour gérer les tâches simultanées.
Tâche de finalisation¶
Vous pouvez ajouter une tâche de finalisation facultative, qui s’exécute après que toutes les autres tâches du graphique de tâches sont terminées (ou ont échoué). Utilisez celle-ci pour :
Effectuer des opérations de nettoyage – par exemple, nettoyer les données intermédiaires qui ne sont plus nécessaires.
Envoyer des notifications de réussite ou d’échec pour une tâche.
Pour créer une tâche de finalisation, utilisez CREATE TASK. .. FINALIZE. .. sur la tâche racine. Exemple :
CREATE TASK task_finalizer
FINALIZE = task_root
AS SELECT 1;
Considérations :
Une tâche de finalisation est toujours associée à une tâche racine. Chaque tâche racine ne peut avoir qu’une seule tâche de finalisation, et une tâche de finalisation ne peut être associée qu’à une seule tâche racine.
Lorsque la tâche racine d’un graphique de tâches est ignorée (par exemple, en raison d’un chevauchement d’exécutions de graphiques de tâches), la tâche de finalisation n’est pas lancée.
Une tâche de finalisation ne peut pas avoir de tâches enfants.
Une tâche de finalisation est planifiée pour ne s’exécuter que lorsqu’aucune autre tâche n’est en cours d’exécution ou en file d’attente dans le graphique de tâches en cours.
Pour d’autres exemples, voir Exemple de tâche de finalisation : Envoyer une notification par courrier électronique et Exemple de tâche de finalisation : Corriger les erreurs.
Gérer la propriété du graphique de tâches¶
Toutes les tâches d’un graphique de tâches doivent avoir le même propriétaire et être stockées dans la même base de données et le même schéma.
Vous pouvez transférer la propriété de toutes les tâches d’un graphique de tâches en utilisant l’une des actions suivantes :
Supprimer le propriétaire de toutes les tâches dans le graphique de tâches en utilisant DROP ROLE. Snowflake transfère la propriété au rôle qui exécute la commande DROP ROLE.
Transférer la propriété de toutes les tâches du graphique de tâches à l’aide de GRANT OWNERSHIP sur toutes les tâches d’un schéma.
Lorsque vous transférez la propriété des tâches d’un graphique de tâches à l’aide de ces méthodes, les tâches du graphique de tâches conservent leurs relations mutuelles.
Le transfert de propriété d’une tâche unique supprime la dépendance entre la tâche et toutes les tâches parents et enfants. Pour plus d’informations, voir Dissocier les tâches parents et enfants (dans ce chapitre).
Note
La réplication de base de données ne fonctionne pas pour les graphiques de tâches si le graphique appartient à un rôle différent de celui qui effectue la réplication.
Exécuter ou planifier des tâches dans un graphique de tâches¶
Exécuter un graphique de tâches manuellement¶
Vous avez la possibilité d’exécuter une seule instance d’un graphique de tâches. Cette fonction est utile pour tester des graphiques de tâches nouveaux ou modifiés avant de les activer en production, ou pour des exécutions ponctuelles en cas de besoin.
Avant de lancer le graphique de tâches, utilisez ALTER TASK. .. RESUME pour chaque tâche enfant (y compris la tâche de finalisation facultative) que vous souhaitez inclure dans l’exécution.
Pour exécuter une seule instance d’un graphique de tâches, utilisez EXECUTE TASK sur la tâche racine. Lorsque vous exécutez la tâche racine, toutes les tâches enfants reprises dans le graphique de tâches sont exécutées dans l’ordre défini par celui-ci.
Exécuter une tâche selon une planification ou en tant que tâche déclenchée¶
Dans la tâche racine, définissez quand le graphique de tâches s’exécute. Les graphiques de tâches peuvent s’exécuter selon une planification récurrente ou être déclenchés par un événement. Pour plus d’informations, consultez les rubriques suivantes :
Pour lancer le graphique de tâches, vous pouvez effectuer l’une des opérations suivantes :
Reprenez chaque tâche enfant (y compris la tâche de finalisation) que vous souhaitez inclure dans l’exécution, puis reprenez la tâche racine en utilisant ALTER TASK. .. RESUME.
Reprenez toutes les tâches d’un graphique de tâches en une seule fois en utilisant SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>) sur la tâche racine.
Afficher des tâches dépendantes dans un graphique de tâches¶
Pour voir les tâches enfants d’une tâche racine, appelez la fonction de table TASK_DEPENDENTS. Pour récupérer toutes les tâches d’un graphique de tâches, il convient de saisir la tâche racine lors de l’appel de la fonction.
Vous pouvez également utiliser Snowsight pour gérer et voir vos graphiques de tâches. Pour plus d’informations, voir Affichage des tâches et des graphiques de tâches dans Snowsight.
Modifier, suspendre ou réessayer des tâches¶
Modifier une tâche dans un graphique de tâches¶
Pour modifier une tâche dans un graphique de tâches planifié, suspendez la tâche racine en utilisant ALTER TASK. .. SUSPEND. Si une exécution du graphique de tâches est en cours, celui-ci termine l’exécution actuelle. Toutes les exécutions planifiées à venir de la tâche racine sont annulées.
Lorsque la tâche racine est suspendue, les tâches enfants, y compris la tâche de finalisation, conservent leur état (suspendue, en cours d’exécution ou terminée). Les tâches enfants n’ont pas besoin d’être suspendues individuellement.
Après avoir suspendu la tâche racine, vous pouvez modifier n’importe quelle tâche dans le graphique de tâches.
Pour reprendre le graphique de tâches, vous pouvez effectuer l’une des opérations suivantes :
Reprenez la tâche racine en utilisant ALTER TASK. .. RESUME. Il n’est pas nécessaire de reprendre individuellement les tâches enfants qui étaient en cours d’exécution auparavant.
Reprenez toutes les tâches d’un graphique de tâches en une seule fois en appelant SYSTEM$TASK_DEPENDENTS_ENABLE avec le nom de la tâche racine en argument.
Sauter ou suspendre une tâche enfant¶
Pour sauter une tâche enfant dans un graphique de tâches, suspendez-la en utilisant ALTER TASK. .. SUSPEND.
Lorsque vous suspendez une tâche enfant, le graphique de tâches continue à s’exécuter comme si cette tâche enfant avait réussi. Une tâche enfant avec plusieurs prédécesseurs s’exécute tant que au moins un des prédécesseurs est dans un état de reprise, et que tous les prédécesseurs repris s’exécutent avec succès jusqu’à la fin.
Réessayer une tâche qui a échoué¶
Utilisez EXECUTE TASK. .. RETRY LAST pour tenter d’exécuter le graphique de tâches à partir de la dernière tâche ayant échoué. Si la tâche réussit, toutes les tâches enfants continueront à s’exécuter au fur et à mesure de l’achèvement des tâches précédentes.
Réessais automatiques¶
Par défaut, si une tâche enfant échoue, le graphique de tâches tout entier est considéré comme ayant échoué.
Plutôt que d’attendre la prochaine exécution du graphique de tâches planifié, vous pouvez donner l’instruction à ce graphique de tâches de réessayer immédiatement en définissant le paramètre TASK_AUTO_RETRY_ATTEMPTS
sur la tâche racine. Lorsqu’une tâche enfant échoue, le graphique de tâches tout entier est immédiatement réessayé, jusqu’au nombre de fois spécifié. Si le graphique de tâches n’est toujours pas terminé, il est considéré comme ayant échoué.
Suspendre les graphiques de tâches après l’échec de leur exécution¶
Par défaut, un graphique de tâches est suspendu après 10 échecs consécutifs. Vous pouvez modifier cette valeur en réglant SUSPEND_TASK_AFTER_NUM_FAILURES
sur la tâche racine.
Dans l’exemple suivant, chaque fois qu’une tâche enfant échoue, le graphique de tâches fait immédiatement deux nouvelles tentatives avant d’être considéré comme ayant échoué dans son ensemble. Si le graphique de tâches échoue trois fois de suite, il est suspendu.
CREATE OR REPLACE TASK task_root
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2 -- Failed task graph retries up to 2 times
SUSPEND_TASK_AFTER_NUM_FAILURES = 3 -- Task graph suspends after 3 consecutive failures
AS SELECT 1;
Dissocier les tâches parents et enfants¶
Les dépendances entre les tâches d’un graphique de tâches peuvent être rompues à la suite des actions suivantes :
ALTER TASK … REMOVE AFTER and ALTER TASK … UNSET FINALIZE suppriment le lien entre la tâche cible et les tâches parents spécifiées ou la tâche racine finalisée.
DROP TASK et GRANT OWNERSHIP rompent tous les liens de la tâche cible. Par exemple, la tâche racine A a une tâche enfant B, et la tâche B a une tâche enfant C. Si vous supprimez la tâche B, le lien entre les tâches A et B est rompu, de même que le lien entre les tâches B et C.
Si une combinaison des actions ci-dessus rompt la relation entre la tâche enfant et toutes les tâches parents, la tâche enfant devient soit une tâche autonome, soit une tâche racine.
Note
Si vous accordez la propriété d’une tâche à son propriétaire actuel, les liens de dépendance peuvent ne pas être rompus.
Chevauchement d’exécutions de graphiques de tâches¶
Par défaut, Snowflake garantit qu’une seule instance d’un graphique de tâches donné est autorisée à s’exécuter à la fois. L’exécution suivante d’une tâche racine n’est programmée qu’à la fin de l’exécution de toutes les tâches du graphique de tâches. Cela signifie que si le temps cumulé nécessaire à l’exécution de toutes les tâches du graphique de tâches dépasse le temps explicitement planifié dans la définition de la tâche racine, au moins une exécution du graphique de tâche est ignorée.
Pour permettre aux tâches enfants de se chevaucher, utilisez CREATE TASK ou ALTER TASK pour la tâche racine, et définissez ALLOW_OVERLAPPING_EXECUTION sur TRUE. (Les tâches racines ne sont jamais en chevauchement.)
Le chevauchement des exécutions peut être toléré (voire même souhaitable) lorsque les opérations de lecture/écriture SQL exécutées par les exécutions en chevauchement d’un graphique de tâches ne produisent pas de données incorrectes ou dupliquées. Cependant, pour les autres graphiques de tâches, les propriétaires des tâches (c’est-à-dire le rôle ayant le privilège OWNERSHIP sur toutes les tâches du graphique de tâches) doivent définir une planification appropriée sur la tâche racine et sélectionner une taille d’entrepôt appropriée (ou utiliser des ressources de calcul sans serveur) pour garantir qu’une instance du graphique de tâches se termine avant la planification suivante de la tâche racine.
Pour mieux aligner un graphique de tâches sur la planification définie dans la tâche racine :
Si possible, augmentez le temps de planification entre les exécutions de la tâche racine.
Vous pouvez envisager de modifier les tâches intenses en calculs pour utiliser des ressources de calcul sans serveur. Si la tâche repose sur des ressources de calcul gérées par l’utilisateur, augmentez la taille de l’entrepôt qui exécute des instructions SQL ou des procédures stockées volumineuses ou complexes dans le graphique de tâches.
Analysez les instructions SQL ou la procédure stockée exécutées par chaque tâche. Déterminez si le code peut être réécrit pour tirer parti du traitement parallèle.
Si aucune des solutions ci-dessus ne vous aide, voyez s’il est nécessaire d’autoriser des exécutions simultanées du graphique de tâches en définissant ALLOW_OVERLAPPING_EXECUTION = TRUE sur la tâche racine. Ce paramètre peut être défini lors de la création d’une tâche (à l’aide de CREATE TASK) ou après (à l’aide de ALTER TASK ou dans Snowsight).
Gestion des versions¶
Lorsque la tâche racine d’un graphique de tâches est reprise ou exécutée manuellement, Snowflake définit une version de l’ensemble du graphique de tâches, y compris toutes les propriétés de toutes les tâches du graphique de tâches. Après la suspension et la modification d’une tâche, Snowflake définit une nouvelle version lorsque la tâche racine est reprise ou exécutée manuellement.
Pour modifier ou recréer une tâche dans un graphique de tâches, il convient d’abord de suspendre la tâche racine. Lorsque la tâche racine est suspendue, toutes les futures exécutions planifiées de la tâche racine sont annulées ; cependant, si des tâches sont en cours d’exécution, ces tâches et toutes les tâches descendantes continuent de s’exécuter en utilisant la version actuelle.
Note
Si la définition d’une procédure stockée appelée par une tâche change pendant l’exécution du graphique de tâches, la nouvelle planification peut être exécutée lorsque la procédure stockée est appelée par la tâche lors de l’exécution en cours.
Par exemple, supposons que la tâche racine d’un graphique de tâches soit suspendue, mais qu’une exécution planifiée de cette tâche ait déjà commencé. Le propriétaire de toutes les tâches du graphique de tâches modifie le code SQL appelé par une tâche enfant pendant que la tâche racine est encore en cours d’exécution. La tâche enfant s’exécute et exécute le code SQL dans sa définition en utilisant la version du graphique de tâches qui existait lorsque la tâche racine a commencé son exécution. Lorsque la tâche racine est reprise ou est exécutée manuellement, une nouvelle version du graphique de tâches est définie. Cette nouvelle version inclut les modifications apportées à la tâche enfant.
Pour récupérer l’historique des versions de tâches, interrogez la vue TASK_VERSIONS Account Usage (dans la base de données partagée SNOWFLAKE).
Durée du graphique de tâches¶
La durée d’un graphique de tâches correspond au temps écoulé entre le moment où le démarrage de la tâche racine est planifié et celui où la dernière tâche enfant se termine. Pour calculer la durée d’un graphique de tâches, interrogez la Vue COMPLETE_TASK_GRAPHS et comparez SCHEDULED_TIME avec COMPLETED_TIME.
Par exemple, le diagramme suivant montre un graphique de tâches planifié pour s’exécuter toutes les minutes. La tâche racine et ses deux tâches enfants sont en file d’attente pendant 5 secondes et s’exécutent pendant 10 secondes, soit une durée d’exécution totale de 45 secondes.
Délais d’expiration du graphique de tâches¶
Lorsque USER_TASK_TIMEOUT_MS est paramétré dans la tâche racine, le délai d’expiration s’applique à l’ensemble du graphique de tâches.
Lorsque USER_TASK_TIMEOUT_MS est paramétré dans une tâche enfant ou une tâche de finalisation, le délai d’expiration ne s’applique qu’à cette tâche.
Lorsque USER_TASK_TIMEOUT_MS est spécifié dans la tâche racine et dans une tâche enfant, le délai d’expiration de la tâche enfant prévaut sur le délai d’expiration de la tâche racine pour cette tâche enfant.
Considérations¶
Pour les tâches sans serveur, Snowflake dimensionne automatiquement les ressources pour s’assurer que les tâches se terminent dans un intervalle d’achèvement cible, y compris le temps de file d’attente.
Pour les tâches gérées par l’utilisateur, des périodes d’attente plus longues sont courantes lorsque les tâches sont planifiées pour être exécutées dans un entrepôt partagé ou occupé.
Pour les graphiques de tâches, le temps total peut inclure un temps de file d’attente supplémentaire pour les tâches enfants qui attendent que leurs prédécesseures soient terminés.
Créer un graphique de tâches avec une logique (informations sur l’environnement d’exécution, configuration et valeurs de retour)¶
Les tâches d’un graphique de tâches peuvent utiliser les valeurs de retour des tâches parentes pour effectuer des opérations logiques dans leur corps de fonction.
Considérations :
Certaines commandes logiques, comme SYSTEM$GET_PREDECESSOR_RETURN_VALUE, sont sensibles à la casse. Toutefois, les tâches créées à l’aide de CREATE TASK sans guillemets sont stockées et résolues en majuscules. Pour gérer cela, vous pouvez effectuer l’une des opérations suivantes :
Créez des noms de tâche comportant uniquement des lettres majuscules.
Utilisez des guillemets pour nommer et appeler les tâches.
Pour les noms de tâches définis avec des caractères minuscules, appelez la tâche en utilisant des caractères majuscules. Par exemple : une tâche définie par « CREATE TASK task_c… » peut être appelée sous la forme SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE(“TASK_C”).
Transmettre les informations de configuration au graphique de tâches¶
Vous pouvez transmettre des informations de configuration à l’aide d’un objet JSON lisible par les autres tâches d’un graphique de tâches. Utilisez la syntaxe CREATE/ALTER TASK. .. CONFIG pour définir, annuler ou modifier les informations de configuration présentes dans la tâche racine. Utilisez la fonction SYSTEM$GET_TASK_GRAPH_CONFIG pour les récupérer. Exemple :
CREATE OR REPLACE TASK "task_root"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS SELECT 1;
CREATE OR REPLACE TASK "task_a"
USER_TASK_TIMEOUT_MS = 600000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('task c path',:value);
END;
Transmettre des valeurs de retour entre les tâches¶
Vous pouvez transmettre des valeurs de retour entre les tâches d’un graphique de tâches. Utilisez la fonction SYSTEM$SET_RETURN_VALUE pour ajouter une valeur de retour à une tâche, et la fonction SYSTEM$GET_PREDECESSOR_RETURN_VALUE pour la récupérer.
Lorsqu’une tâche a plusieurs prédécesseures, vous devez spécifier quelle tâche a la valeur de retour que vous souhaitez. Dans l’exemple suivant, nous créons une tâche racine dans un graphique de tâches qui ajoute des informations de configuration.
CREATE OR REPLACE TASK "task_c"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
END;
CREATE OR REPLACE TASK "task_d"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_c"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
END;
Obtenir et utiliser des informations sur l’environnement d’exécution¶
Utilisez la fonction SYSTEM$TASK_RUNTIME_INFO pour obtenir des informations sur l’exécution de la tâche en cours. Cette fonction comporte plusieurs options spécifiques aux graphiques de tâches. Par exemple, utilisez CURRENT_ROOT_TASK_NAME pour obtenir le nom de la tâche racine du graphique de tâches en cours. Les exemples suivants montrent comment ajouter un horodatage à une table en fonction du moment où la tâche racine du graphique de tâches a démarré.
-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO date_time_table VALUES('order_date',:value);
END;
Exemples¶
Exemple : Lancer plusieurs tâches et signaler le statut¶
Dans l’exemple suivant, la tâche racine lance des tâches de mise à jour de trois tables différentes. Après la mise à jour de ces trois tables, une tâche combine les informations des trois autres tables dans une table de ventes agrégées.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
END;
;
-- task_customer_table: Updates the customer table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT customer_id FROM ref_cust_table
WHERE cust_name = "Jane Doe";);
INSERT INTO customer_table VALUES('customer_id',:value);
END;
;
-- task_product_table: Updates the product table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT product_id FROM ref_item_table
WHERE PRODUCT_NAME = "widget";);
INSERT INTO product_table VALUES('product_id',:value);
END;
;
-- task_date_time_table: Updates the date/time table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO "date_time_table" VALUES('order_date',:value);
END;
;
-- task_sales_table: Aggregates changes from other tables.
-- Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_customer_table, task_product_table, task_date_time_table
AS
BEGIN
LET VALUE := (SELECT sales_order_id FROM ORDERS);
JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
INSERT INTO sales_table VALUES('sales_order_id',:value);
END;
;
Exemple de tâche de finalisation : Envoyer une notification par courrier électronique¶
Cet exemple montre comment une tâche de finalisation peut envoyer un courrier électronique résumant l’exécution du graphique de tâches. Cette tâche appelle deux fonctions externes : l’une regroupe les informations sur le statut d’achèvement de la tâche, et l’autre utilise ces informations pour composer un e-mail qui peut être envoyé par l’intermédiaire d’un service de messagerie à distance.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_root
AS
DECLARE
my_root_task_id STRING;
my_start_time TIMESTAMP_LTZ;
summary_json STRING;
summary_html STRING;
BEGIN
--- Get root task ID
my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
--- Get root task scheduled time
my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
--- Combine all task run info into one JSON string
summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
--- Convert JSON into HTML table
summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));
--- Send HTML to email
CALL SYSTEM$SEND_EMAIL(
'email_notification',
'admin@snowflake.com',
'notification task run summary',
:summary_html,
'text/html');
--- Set return value for finalizer
CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
END
CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
(SELECT
ARRAY_AGG(OBJECT_CONSTRUCT(
'task_name', name,
'run_status', state,
'return_value', return_value,
'started', query_start_time,
'duration', duration,
'error_message', error_message
)
) AS GRAPH_RUN_SUMMARY
FROM
(SELECT
NAME,
CASE
WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
WHEN STATE = 'FAILED' then '🔴 Failed'
WHEN STATE = 'SKIPPED' then '🔵 Skipped'
WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
END AS STATE,
RETURN_VALUE,
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
' s') AS DURATION,
ERROR_MESSAGE
FROM
TABLE(my-database.information_schema.task_history(
ROOT_TASK_ID => my_root_task_id ::STRING,
SCHEDULED_TIME_RANGE_START => my_start_time,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
ORDER BY
SCHEDULED_TIME)
)::STRING
$$
;
CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<img src="https://example.com/logo.jpg"
alt="Company logo" height="72">
<p><strong>Task Graph Run Summary</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left;
width: {column_widths[i]}">{header.capitalize()}</th>'
HTML +="""
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left;
width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML +="""
</tbody>
</table>
"""
return HTML
$$
;
Exemple de tâche de finalisation : Corriger les erreurs¶
Cet exemple montre comment une tâche de finalisation peut corriger les erreurs.
À des fins de démonstration, les tâches sont conçues pour échouer lors de leur première exécution. Les tâches de finalisation corrigent le problème et relancent les tâches, qui réussissent lors des exécutions suivantes :
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- 1. Set the default configurations.
-- Creates a root task ("task_a"), and sets the default configurations
-- used throughout the task graph.
-- Configurations include:
-- * Each task runs after one minute, with a 60-second timeout.
-- * If a task fails, retry it twice. if it fails twice,
-- the entire task graph is considered as failed.
-- * If the task graph fails consecutively three times, suspend the task.
-- * Other environment values are set.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task a successful');
END;
;
-- 2. Use a runtime reflection variable.
-- Creates a child task ("task_b").
-- By design, this example fails the first time it runs, because
-- it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO demo_table VALUES('task b name',:VALUE);
END;
;
-- 3. Get a task graph configuration value.
-- Creates the child task ("task_c").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60000
AFTER task_b
AS
BEGIN
CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
INSERT INTO demo_table VALUES('task c path',:value);
END;
;
-- 4. Get a value from a predecessor.
-- Creates the child task ("task_d").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60000
AFTER task_c
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
END;
;
-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
-- After the finalizer completes, the task should automatically retry
-- (see task_a: task_auto_retry_attempts).
-- On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_a
AS
BEGIN
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
END;
;
-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
-- Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');
-- 7. Query the task history
SELECT
name, state, attempt_number, scheduled_from
FROM
TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;
;
-- 8. Suspend the task graph to stop incurring costs
-- Note: To stop the task graph, you only need to suspend the root task
-- (task_a). Child tasks don’t run unless the root task is run.
-- If any child tasks are running, they have a limited duration
-- and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;
-- 9. Check tasks during execution (optional)
-- Run this command to query the demo table during execution
-- to check which tasks have run.
SELECT * FROM demo_table;
-- 10. Demo reset (optional)
-- Run this command to remove the demo table.
-- This causes task_b to fail during its first run.
-- After the task graph retries, task_b will succeed.
DROP TABLE demo_table;