Créer une séquence de tâches à l’aide d’un graphique de tâches

Dans Snowflake, vous pouvez gérer plusieurs tâches à l’aide d’un graphique de tâches, également appelé graphe orienté acyclique, ou DAG. Un graphique de tâches est composé d’une tâche racine et de tâches enfants dépendantes. Les dépendances doivent être exécutées du début à la fin, sans boucle. Une tâche finale facultative (ou tâche de finalisation) peut effectuer des opérations de nettoyage une fois que toutes les autres tâches sont terminées.

Vous pouvez créer des graphiques de tâches à comportement dynamique en spécifiant des opérations logiques dans le corps de la tâche à l’aide des valeurs d’exécution, de la configuration au niveau du graphique et des valeurs de retour des tâches parentes.

Vous pouvez créer des tâches et des graphiques de tâches en utilisant des langages et des outils pris en charge comme SQL, JavaScript, Python, Java, Scala ou Snowflake Scripting. Cette rubrique fournit des exemples en SQL. Pour des exemples en Python, voir Gestion des tâches et des graphiques de tâches Snowflake avec Python.

Créer un graphique de tâches

Créez une tâche racine à l’aide de CREATE TASK, puis créez des tâches enfants à l’aide de CREATE TASK .. AFTER pour sélectionner les tâches parentes.

La tâche racine détermine à quel moment le graphique de tâches s’exécute. Les tâches enfants sont exécutées dans l’ordre défini par le graphique de tâches.

Lorsque plusieurs tâches enfants ont le même parent, elles s’exécutent en parallèle.

Lorsqu’une tâche a plusieurs parents, elle attend que toutes les tâches précédentes aient été menées à bien avant de commencer. (La tâche peut également être exécutée lorsque certaines tâches parentes sont ignorées. Pour plus d’informations, voir Sauter ou suspendre une tâche enfant.)

L’exemple suivant crée un graphique de tâches sans serveur qui commence par une tâche racine planifiée pour s’exécuter toutes les minutes. La tâche racine a deux tâches enfants qui s’exécutent en parallèle. (Le diagramme montre un exemple où l’une de ces tâches met plus de temps à s’exécuter que l’autre.) Une fois ces deux tâches terminées, une troisième tâche enfant est exécutée. La tâche de finalisation s’exécute après l’achèvement ou l’échec de toutes les autres tâches :

Diagramme d'une séquence de tâches.
CREATE TASK task_root
  SCHEDULE = '1 MINUTE'
  AS SELECT 1;

CREATE TASK task_a
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_b
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_c
  AFTER task_a, task_b
  AS SELECT 1;
Copy

Considérations :

  • Un graphique de tâches est limité à un maximum de 1 000 tâches.

  • Une tâche unique peut avoir un maximum de 100 tâches parents et 100 tâches enfants.

  • Lorsque des tâches sont exécutées en parallèle sur le même entrepôt géré par l’utilisateur, les ressources de calcul doivent être dimensionnées pour gérer les tâches simultanées.

Tâche de finalisation

Vous pouvez ajouter une tâche de finalisation facultative, qui s’exécute après que toutes les autres tâches du graphique de tâches sont terminées (ou ont échoué). Utilisez celle-ci pour :

  • Effectuer des opérations de nettoyage – par exemple, nettoyer les données intermédiaires qui ne sont plus nécessaires.

  • Envoyer des notifications de réussite ou d’échec pour une tâche.

Une séquence de tâches montre une tâche racine pointant vers deux tâches enfants, qui à leur tour pointent vers une autre tâche. Une tâche de finalisation est affichée en bas, s'exécutant après que toutes les autres tâches se sont achevées ou ont échoué.

Pour créer une tâche de finalisation, utilisez CREATE TASK. .. FINALIZE. .. sur la tâche racine. Exemple :

CREATE TASK task_finalizer
  FINALIZE = task_root
  AS SELECT 1;
Copy

Considérations :

  • Une tâche de finalisation est toujours associée à une tâche racine. Chaque tâche racine ne peut avoir qu’une seule tâche de finalisation, et une tâche de finalisation ne peut être associée qu’à une seule tâche racine.

  • Lorsque la tâche racine d’un graphique de tâches est ignorée (par exemple, en raison d’un chevauchement d’exécutions de graphiques de tâches), la tâche de finalisation n’est pas lancée.

  • Une tâche de finalisation ne peut pas avoir de tâches enfants.

  • Une tâche de finalisation est planifiée pour ne s’exécuter que lorsqu’aucune autre tâche n’est en cours d’exécution ou en file d’attente dans le graphique de tâches en cours.

Pour d’autres exemples, voir Exemple de tâche de finalisation : Envoyer une notification par courrier électronique et Exemple de tâche de finalisation : Corriger les erreurs.

Gérer la propriété du graphique de tâches

Toutes les tâches d’un graphique de tâches doivent avoir le même propriétaire et être stockées dans la même base de données et le même schéma.

Vous pouvez transférer la propriété de toutes les tâches d’un graphique de tâches en utilisant l’une des actions suivantes :

  • Supprimer le propriétaire de toutes les tâches dans le graphique de tâches en utilisant DROP ROLE. Snowflake transfère la propriété au rôle qui exécute la commande DROP ROLE.

  • Transférer la propriété de toutes les tâches du graphique de tâches à l’aide de GRANT OWNERSHIP sur toutes les tâches d’un schéma.

Lorsque vous transférez la propriété des tâches d’un graphique de tâches à l’aide de ces méthodes, les tâches du graphique de tâches conservent leurs relations mutuelles.

Le transfert de propriété d’une tâche unique supprime la dépendance entre la tâche et toutes les tâches parents et enfants. Pour plus d’informations, voir Dissocier les tâches parents et enfants (dans ce chapitre).

Note

La réplication de base de données ne fonctionne pas pour les graphiques de tâches si le graphique appartient à un rôle différent de celui qui effectue la réplication.

Exécuter ou planifier des tâches dans un graphique de tâches

Exécuter un graphique de tâches manuellement

Vous avez la possibilité d’exécuter une seule instance d’un graphique de tâches. Cette fonction est utile pour tester des graphiques de tâches nouveaux ou modifiés avant de les activer en production, ou pour des exécutions ponctuelles en cas de besoin.

Avant de lancer le graphique de tâches, utilisez ALTER TASK. .. RESUME pour chaque tâche enfant (y compris la tâche de finalisation facultative) que vous souhaitez inclure dans l’exécution.

Pour exécuter une seule instance d’un graphique de tâches, utilisez EXECUTE TASK sur la tâche racine. Lorsque vous exécutez la tâche racine, toutes les tâches enfants reprises dans le graphique de tâches sont exécutées dans l’ordre défini par celui-ci.

Exécuter une tâche selon une planification ou en tant que tâche déclenchée

Dans la tâche racine, définissez quand le graphique de tâches s’exécute. Les graphiques de tâches peuvent s’exécuter selon une planification récurrente ou être déclenchés par un événement. Pour plus d’informations, consultez les rubriques suivantes :

Pour lancer le graphique de tâches, vous pouvez effectuer l’une des opérations suivantes :

  • Reprenez chaque tâche enfant (y compris la tâche de finalisation) que vous souhaitez inclure dans l’exécution, puis reprenez la tâche racine en utilisant ALTER TASK. .. RESUME.

  • Reprenez toutes les tâches d’un graphique de tâches en une seule fois en utilisant SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>) sur la tâche racine.

Afficher des tâches dépendantes dans un graphique de tâches

Pour voir les tâches enfants d’une tâche racine, appelez la fonction de table TASK_DEPENDENTS. Pour récupérer toutes les tâches d’un graphique de tâches, il convient de saisir la tâche racine lors de l’appel de la fonction.

Vous pouvez également utiliser Snowsight pour gérer et voir vos graphiques de tâches. Pour plus d’informations, voir Affichage des tâches et des graphiques de tâches dans Snowsight.

Modifier, suspendre ou réessayer des tâches

Modifier une tâche dans un graphique de tâches

Pour modifier une tâche dans un graphique de tâches planifié, suspendez la tâche racine en utilisant ALTER TASK. .. SUSPEND. Si une exécution du graphique de tâches est en cours, celui-ci termine l’exécution actuelle. Toutes les exécutions planifiées à venir de la tâche racine sont annulées.

Lorsque la tâche racine est suspendue, les tâches enfants, y compris la tâche de finalisation, conservent leur état (suspendue, en cours d’exécution ou terminée). Les tâches enfants n’ont pas besoin d’être suspendues individuellement.

Après avoir suspendu la tâche racine, vous pouvez modifier n’importe quelle tâche dans le graphique de tâches.

Pour reprendre le graphique de tâches, vous pouvez effectuer l’une des opérations suivantes :

  • Reprenez la tâche racine en utilisant ALTER TASK. .. RESUME. Il n’est pas nécessaire de reprendre individuellement les tâches enfants qui étaient en cours d’exécution auparavant.

  • Reprenez toutes les tâches d’un graphique de tâches en une seule fois en appelant SYSTEM$TASK_DEPENDENTS_ENABLE avec le nom de la tâche racine en argument.

Sauter ou suspendre une tâche enfant

Pour sauter une tâche enfant dans un graphique de tâches, suspendez-la en utilisant ALTER TASK. .. SUSPEND.

Lorsque vous suspendez une tâche enfant, le graphique de tâches continue à s’exécuter comme si cette tâche enfant avait réussi. Une tâche enfant avec plusieurs prédécesseurs s’exécute tant que au moins un des prédécesseurs est dans un état de reprise, et que tous les prédécesseurs repris s’exécutent avec succès jusqu’à la fin.

Diagramme montrant un graphique de tâches qui comprend une tâche enfant suspendue. La tâche enfant suspendue est ignorée et le graphique de tâches se termine.

Réessayer une tâche qui a échoué

Utilisez EXECUTE TASK. .. RETRY LAST pour tenter d’exécuter le graphique de tâches à partir de la dernière tâche ayant échoué. Si la tâche réussit, toutes les tâches enfants continueront à s’exécuter au fur et à mesure de l’achèvement des tâches précédentes.

Réessais automatiques

Par défaut, si une tâche enfant échoue, le graphique de tâches tout entier est considéré comme ayant échoué.

Plutôt que d’attendre la prochaine exécution du graphique de tâches planifié, vous pouvez donner l’instruction à ce graphique de tâches de réessayer immédiatement en définissant le paramètre TASK_AUTO_RETRY_ATTEMPTS sur la tâche racine. Lorsqu’une tâche enfant échoue, le graphique de tâches tout entier est immédiatement réessayé, jusqu’au nombre de fois spécifié. Si le graphique de tâches n’est toujours pas terminé, il est considéré comme ayant échoué.

Suspendre les graphiques de tâches après l’échec de leur exécution

Par défaut, un graphique de tâches est suspendu après 10 échecs consécutifs. Vous pouvez modifier cette valeur en réglant SUSPEND_TASK_AFTER_NUM_FAILURES sur la tâche racine.

Dans l’exemple suivant, chaque fois qu’une tâche enfant échoue, le graphique de tâches fait immédiatement deux nouvelles tentatives avant d’être considéré comme ayant échoué dans son ensemble. Si le graphique de tâches échoue trois fois de suite, il est suspendu.

CREATE OR REPLACE TASK task_root
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2   --  Failed task graph retries up to 2 times
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3   --  Task graph suspends after 3 consecutive failures
  AS SELECT 1;
Copy

Chevauchement d’exécutions de graphiques de tâches

Par défaut, Snowflake garantit qu’une seule instance d’un graphique de tâches donné est autorisée à s’exécuter à la fois. L’exécution suivante d’une tâche racine n’est programmée qu’à la fin de l’exécution de toutes les tâches du graphique de tâches. Cela signifie que si le temps cumulé nécessaire à l’exécution de toutes les tâches du graphique de tâches dépasse le temps explicitement planifié dans la définition de la tâche racine, au moins une exécution du graphique de tâche est ignorée.

Pour permettre aux tâches enfants de se chevaucher, utilisez CREATE TASK ou ALTER TASK pour la tâche racine, et définissez ALLOW_OVERLAPPING_EXECUTION sur TRUE. (Les tâches racines ne sont jamais en chevauchement.)

Chevauchement d'exécutions de graphiques de tâches

Le chevauchement des exécutions peut être toléré (voire même souhaitable) lorsque les opérations de lecture/écriture SQL exécutées par les exécutions en chevauchement d’un graphique de tâches ne produisent pas de données incorrectes ou dupliquées. Cependant, pour les autres graphiques de tâches, les propriétaires des tâches (c’est-à-dire le rôle ayant le privilège OWNERSHIP sur toutes les tâches du graphique de tâches) doivent définir une planification appropriée sur la tâche racine et sélectionner une taille d’entrepôt appropriée (ou utiliser des ressources de calcul sans serveur) pour garantir qu’une instance du graphique de tâches se termine avant la planification suivante de la tâche racine.

Pour mieux aligner un graphique de tâches sur la planification définie dans la tâche racine :

  1. Si possible, augmentez le temps de planification entre les exécutions de la tâche racine.

  2. Vous pouvez envisager de modifier les tâches intenses en calculs pour utiliser des ressources de calcul sans serveur. Si la tâche repose sur des ressources de calcul gérées par l’utilisateur, augmentez la taille de l’entrepôt qui exécute des instructions SQL ou des procédures stockées volumineuses ou complexes dans le graphique de tâches.

  3. Analysez les instructions SQL ou la procédure stockée exécutées par chaque tâche. Déterminez si le code peut être réécrit pour tirer parti du traitement parallèle.

Si aucune des solutions ci-dessus ne vous aide, voyez s’il est nécessaire d’autoriser des exécutions simultanées du graphique de tâches en définissant ALLOW_OVERLAPPING_EXECUTION = TRUE sur la tâche racine. Ce paramètre peut être défini lors de la création d’une tâche (à l’aide de CREATE TASK) ou après (à l’aide de ALTER TASK ou dans Snowsight).

Gestion des versions

Lorsque la tâche racine d’un graphique de tâches est reprise ou exécutée manuellement, Snowflake définit une version de l’ensemble du graphique de tâches, y compris toutes les propriétés de toutes les tâches du graphique de tâches. Après la suspension et la modification d’une tâche, Snowflake définit une nouvelle version lorsque la tâche racine est reprise ou exécutée manuellement.

Pour modifier ou recréer une tâche dans un graphique de tâches, il convient d’abord de suspendre la tâche racine. Lorsque la tâche racine est suspendue, toutes les futures exécutions planifiées de la tâche racine sont annulées ; cependant, si des tâches sont en cours d’exécution, ces tâches et toutes les tâches descendantes continuent de s’exécuter en utilisant la version actuelle.

Note

Si la définition d’une procédure stockée appelée par une tâche change pendant l’exécution du graphique de tâches, la nouvelle planification peut être exécutée lorsque la procédure stockée est appelée par la tâche lors de l’exécution en cours.

Par exemple, supposons que la tâche racine d’un graphique de tâches soit suspendue, mais qu’une exécution planifiée de cette tâche ait déjà commencé. Le propriétaire de toutes les tâches du graphique de tâches modifie le code SQL appelé par une tâche enfant pendant que la tâche racine est encore en cours d’exécution. La tâche enfant s’exécute et exécute le code SQL dans sa définition en utilisant la version du graphique de tâches qui existait lorsque la tâche racine a commencé son exécution. Lorsque la tâche racine est reprise ou est exécutée manuellement, une nouvelle version du graphique de tâches est définie. Cette nouvelle version inclut les modifications apportées à la tâche enfant.

Pour récupérer l’historique des versions de tâches, interrogez la vue TASK_VERSIONS Account Usage (dans la base de données partagée SNOWFLAKE).

Durée du graphique de tâches

La durée d’un graphique de tâches correspond au temps écoulé entre le moment où le démarrage de la tâche racine est planifié et celui où la dernière tâche enfant se termine. Pour calculer la durée d’un graphique de tâches, interrogez la Vue COMPLETE_TASK_GRAPHS et comparez SCHEDULED_TIME avec COMPLETED_TIME.

Par exemple, le diagramme suivant montre un graphique de tâches planifié pour s’exécuter toutes les minutes. La tâche racine et ses deux tâches enfants sont en file d’attente pendant 5 secondes et s’exécutent pendant 10 secondes, soit une durée d’exécution totale de 45 secondes.

Diagramme d'un graphique de tâches présentant trois tâches avec leurs dépendances. Chaque tâche est en file d'attente pendant 5 secondes et s'exécute pendant 10 secondes, soit une durée d'exécution totale de 45 secondes.

Délais d’expiration du graphique de tâches

Lorsque USER_TASK_TIMEOUT_MS est paramétré dans la tâche racine, le délai d’expiration s’applique à l’ensemble du graphique de tâches.

Lorsque USER_TASK_TIMEOUT_MS est paramétré dans une tâche enfant ou une tâche de finalisation, le délai d’expiration ne s’applique qu’à cette tâche.

Lorsque USER_TASK_TIMEOUT_MS est spécifié dans la tâche racine et dans une tâche enfant, le délai d’expiration de la tâche enfant prévaut sur le délai d’expiration de la tâche racine pour cette tâche enfant.

Considérations

  • Pour les tâches sans serveur, Snowflake dimensionne automatiquement les ressources pour s’assurer que les tâches se terminent dans un intervalle d’achèvement cible, y compris le temps de file d’attente.

  • Pour les tâches gérées par l’utilisateur, des périodes d’attente plus longues sont courantes lorsque les tâches sont planifiées pour être exécutées dans un entrepôt partagé ou occupé.

  • Pour les graphiques de tâches, le temps total peut inclure un temps de file d’attente supplémentaire pour les tâches enfants qui attendent que leurs prédécesseures soient terminés.

Créer un graphique de tâches avec une logique (informations sur l’environnement d’exécution, configuration et valeurs de retour)

Les tâches d’un graphique de tâches peuvent utiliser les valeurs de retour des tâches parentes pour effectuer des opérations logiques dans leur corps de fonction.

Considérations :

  • Certaines commandes logiques, comme SYSTEM$GET_PREDECESSOR_RETURN_VALUE, sont sensibles à la casse. Toutefois, les tâches créées à l’aide de CREATE TASK sans guillemets sont stockées et résolues en majuscules. Pour gérer cela, vous pouvez effectuer l’une des opérations suivantes :

    • Créez des noms de tâche comportant uniquement des lettres majuscules.

    • Utilisez des guillemets pour nommer et appeler les tâches.

    • Pour les noms de tâches définis avec des caractères minuscules, appelez la tâche en utilisant des caractères majuscules. Par exemple : une tâche définie par « CREATE TASK task_c… » peut être appelée sous la forme SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE(“TASK_C”).

Transmettre les informations de configuration au graphique de tâches

Vous pouvez transmettre des informations de configuration à l’aide d’un objet JSON lisible par les autres tâches d’un graphique de tâches. Utilisez la syntaxe CREATE/ALTER TASK. .. CONFIG pour définir, annuler ou modifier les informations de configuration présentes dans la tâche racine. Utilisez la fonction SYSTEM$GET_TASK_GRAPH_CONFIG pour les récupérer. Exemple :

CREATE OR REPLACE TASK "task_root"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS SELECT 1;

CREATE OR REPLACE TASK "task_a"
  USER_TASK_TIMEOUT_MS = 600000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
Copy

Transmettre des valeurs de retour entre les tâches

Vous pouvez transmettre des valeurs de retour entre les tâches d’un graphique de tâches. Utilisez la fonction SYSTEM$SET_RETURN_VALUE pour ajouter une valeur de retour à une tâche, et la fonction SYSTEM$GET_PREDECESSOR_RETURN_VALUE pour la récupérer.

Lorsqu’une tâche a plusieurs prédécesseures, vous devez spécifier quelle tâche a la valeur de retour que vous souhaitez. Dans l’exemple suivant, nous créons une tâche racine dans un graphique de tâches qui ajoute des informations de configuration.

CREATE OR REPLACE TASK "task_c"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
    END;

CREATE OR REPLACE TASK "task_d"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_c"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
    END;
Copy

Obtenir et utiliser des informations sur l’environnement d’exécution

Utilisez la fonction SYSTEM$TASK_RUNTIME_INFO pour obtenir des informations sur l’exécution de la tâche en cours. Cette fonction comporte plusieurs options spécifiques aux graphiques de tâches. Par exemple, utilisez CURRENT_ROOT_TASK_NAME pour obtenir le nom de la tâche racine du graphique de tâches en cours. Les exemples suivants montrent comment ajouter un horodatage à une table en fonction du moment où la tâche racine du graphique de tâches a démarré.

-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO date_time_table VALUES('order_date',:value);
    END;
Copy

Exemples

Exemple : Lancer plusieurs tâches et signaler le statut

Dans l’exemple suivant, la tâche racine lance des tâches de mise à jour de trois tables différentes. Après la mise à jour de ces trois tables, une tâche combine les informations des trois autres tables dans une table de ventes agrégées.

L'organigramme montre une tâche racine qui lance trois tâches enfants, dont chacune met à jour une table. Ces trois tâches précèdent toutes une autre tâche enfant, qui combine les modifications précédentes dans une autre table.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
    END;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT customer_id FROM ref_cust_table
        WHERE cust_name = "Jane Doe";);
      INSERT INTO customer_table VALUES('customer_id',:value);
    END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT product_id FROM ref_item_table
        WHERE PRODUCT_NAME = "widget";);
      INSERT INTO product_table VALUES('product_id',:value);
    END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO "date_time_table" VALUES('order_date',:value);
    END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_customer_table, task_product_table, task_date_time_table
  AS
    BEGIN
      LET VALUE := (SELECT sales_order_id FROM ORDERS);
      JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
      INSERT INTO sales_table VALUES('sales_order_id',:value);
    END;
;
Copy

Exemple de tâche de finalisation : Envoyer une notification par courrier électronique

Cet exemple montre comment une tâche de finalisation peut envoyer un courrier électronique résumant l’exécution du graphique de tâches. Cette tâche appelle deux fonctions externes : l’une regroupe les informations sur le statut d’achèvement de la tâche, et l’autre utilise ces informations pour composer un e-mail qui peut être envoyé par l’intermédiaire d’un service de messagerie à distance.

Une séquence de tâches montre une tâche racine pointant vers deux tâches enfants, qui à leur tour pointent vers une autre tâche. Une tâche de finalisation est affichée en bas. Celle-ci s'exécute après que toutes les autres tâches se sont achevées ou ont échoué.
CREATE OR REPLACE TASK notify_finalizer
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_root
AS
  DECLARE
    my_root_task_id STRING;
    my_start_time TIMESTAMP_LTZ;
    summary_json STRING;
    summary_html STRING;
  BEGIN
    --- Get root task ID
    my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
    --- Get root task scheduled time
    my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
    --- Combine all task run info into one JSON string
    summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
    --- Convert JSON into HTML table
    summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));

    --- Send HTML to email
    CALL SYSTEM$SEND_EMAIL(
        'email_notification',
        'admin@snowflake.com',
        'notification task run summary',
        :summary_html,
        'text/html');
    --- Set return value for finalizer
    CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
  END

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
  RETURNS STRING
AS
$$
  (SELECT
    ARRAY_AGG(OBJECT_CONSTRUCT(
      'task_name', name,
      'run_status', state,
      'return_value', return_value,
      'started', query_start_time,
      'duration', duration,
      'error_message', error_message
      )
    ) AS GRAPH_RUN_SUMMARY
  FROM
    (SELECT
      NAME,
      CASE
        WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
        WHEN STATE = 'FAILED' then '🔴 Failed'
        WHEN STATE = 'SKIPPED' then '🔵 Skipped'
        WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
      END AS STATE,
      RETURN_VALUE,
      TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
      CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
        ' s') AS DURATION,
      ERROR_MESSAGE
    FROM
      TABLE(my-database.information_schema.task_history(
        ROOT_TASK_ID => my_root_task_id ::STRING,
        SCHEDULED_TIME_RANGE_START => my_start_time,
        SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
    ORDER BY
      SCHEDULED_TIME)
  )::STRING
$$
;

CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
  IMPORT JSON

  def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

  DATA = json.loads(JSON_DATA)
  HTML = f"""
    <img src="https://example.com/logo.jpg"
      alt="Company logo" height="72">
    <p><strong>Task Graph Run Summary</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
        """
        headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
        for i, header in enumerate(headers):
            HTML += f'<th scope="col" style="text-align:left;
            width: {column_widths[i]}">{header.capitalize()}</th>'

        HTML +="""
        </tr>
      </thead>
      <tbody>
        """
        for ROW_DATA in DATA:
          HTML += "<tr>"
          for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
          HTML += "</tr>"
        HTML +="""
      </tbody>
    </table>
    """
  return HTML
$$
;
Copy

Exemple de tâche de finalisation : Corriger les erreurs

Cet exemple montre comment une tâche de finalisation peut corriger les erreurs.

À des fins de démonstration, les tâches sont conçues pour échouer lors de leur première exécution. Les tâches de finalisation corrigent le problème et relancent les tâches, qui réussissent lors des exécutions suivantes :

Diagramme représentant une série de tâches. La tâche A est représentée en haut à gauche. Une flèche pointe à droite de la tâche A vers la tâche B, qui pointe vers la tâche C, qui pointe vers la tâche D. En dessous de la tâche A, une flèche pointe vers la tâche de finalisation, la tâche F.
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations.
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively three times, suspend the task.
--    * Other environment values are set.

CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task a successful');
    END;
;

-- 2. Use a runtime reflection variable.
--    Creates a child task ("task_b").
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO demo_table VALUES('task b name',:VALUE);
    END;
;

-- 3. Get a task graph configuration value.
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_b
  AS
    BEGIN
      CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_c
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
      INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
    END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: task_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_a
  AS
    BEGIN
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
    END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
  FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
  LIMIT 5;
;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     After the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy