Tâches déclenchées

Utilisez les tâches déclenchées pour exécuter des tâches à chaque fois qu’il y a un changement dans un flux. Il n’est donc pas nécessaire d’interroger fréquemment une source lorsque la disponibilité de nouvelles données est imprévisible. Il réduit également le temps de latence car les données sont traitées immédiatement.

Les tâches déclenchées n’utilisent pas de ressources de calcul jusqu’à ce que l’événement soit déclenché.

Considérations

  • Pour les flux hébergés sur une table de répertoire, cette table doit être actualisée avant qu’une tâche déclenchée puisse détecter les changements. Pour détecter les changements, vous pouvez procéder de l’une des manières suivantes :

  • Les flux sur les tables externes et les tables hybrides ne sont pas pris en charge.

Créer une tâche déclenchée

Utilisez CREATE TASK en spécifiant les paramètres suivants :

  • Définissez le flux cible à l’aide de la clause WHEN. (Ne pas inclure le paramètre SCHEDULE.)

    Lorsque vous travaillez avec plusieurs flux de données, vous pouvez utiliser les paramètres conditionnels WHEN ... AND et WHEN ... OR.

  • Exigences supplémentaires basées sur les ressources de calcul :

    • Pour créer une tâche sans serveur, vous devez inclure le paramètre TARGET_COMPLETION_INTERVAL. N’incluez pas le paramètre WAREHOUSE. Snowflake estime les ressources nécessaires en fonction de l’intervalle d’achèvement cible et s’adapte pour terminer la tâche dans ce délai.

    Diagramme montrant le fonctionnement des tâches déclenchées sans serveur dans Snowflake.
    • Pour créer une tâche qui s’exécute sur un entrepôt géré par l’utilisateur, incluez le paramètre WAREHOUSE et définissez l’entrepôt.

Migrer une tâche existante d’une tâche planifiée vers une tâche déclenchée

  1. Suspendez la tâche.

  2. Annulez le paramètre SCHEDULE et ajoutez la clause WHEN pour définir le flux cible.

  3. Reprenez la tâche.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrer une tâche déclenchée existante gérée par l’utilisateur vers une tâche déclenchée sans serveur

  1. Suspendez la tâche.

  2. Supprimez le paramètre WAREHOUSE et spécifiez le paramètre TARGET_COMPLETION_INTERVAL.

  3. Reprenez la tâche.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Pour plus d’informations, voir Tâches sans serveur.

Autoriser l’exécution de la tâche déclenchée

Lorsque vous créez une tâche déclenchée, elle démarre dans l’état suspendu.

Pour commencer à surveiller le flux :

La tâche s’exécute dans les conditions suivantes :

  • Lorsque vous reprenez une tâche déclenchée pour la première fois, celle-ci vérifie que le flux n’a pas changé depuis sa dernière exécution. En cas de changement, la tâche est exécutée. Dans le cas contraire, la tâche est ignorée sans utiliser de ressources de calcul.

  • Si une tâche est en cours d’exécution et que le flux contient de nouvelles données, la tâche déclenchée attend que cette tâche en cours soit terminée. Snowflake veille à ce qu’une seule instance d’une tâche soit exécutée à la fois.

  • Une fois la tâche terminée, Snowflake vérifie à nouveau le flux pour y détecter des changements. S’il y a des changements, la tâche s’exécute à nouveau ; sinon, elle est ignorée.

  • La tâche s’exécute chaque fois que de nouvelles données sont détectées dans le flux.

  • Si les données du flux sont hébergées dans une table de répertoire, vous pouvez procéder de l’une des manières suivantes pour détecter les changements :

  • Toutes les 12 heures, la tâche effectue un bilan de santé pour éviter que les flux ne deviennent périmés. S’il n’y a pas de changement, Snowflake passe à la tâche sans utiliser de ressources de calcul. Pour les flux, les instructions de la tâche doivent consommer les données du flux avant l’expiration du délai de conservation des données, faute de quoi le flux devient périmé. Pour plus d’informations, voir Éviter l’obsolescence des flux.

  • Par défaut, les tâches déclenchées s’exécutent au maximum toutes les 30 secondes. Vous pouvez modifier le paramètre USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS pour qu’il s’exécute plus fréquemment, jusqu’à toutes les 10 secondes.

  • Lorsqu’une tâche est déclenchée par Streams on Views, toute modification apportée aux tables référencées par la requête Streams on Views déclenche également la tâche, indépendamment des jointures, agrégations ou filtres de la requête.

Diagramme montrant comment les tâches déclenchées gèrent les nouvelles données au fur et à mesure qu'elles arrivent et vérifient également les modifications toutes les 12 heures.

Surveillance des tâches déclenchées

  • Dans la sortie de SHOW TASKS et de DESC TASK, la propriété SCHEDULE affiche NULL pour les tâches déclenchées.

  • Dans la sortie de la vue task_history des schemas information_schema et account_usage, la colonne SCHEDULED_FROM affiche TRIGGER.

Exemples

Exemple 1 : Créer une tâche sans serveur qui s’exécute à chaque fois que les données sont modifiées dans un flux.

La tâche étant sans serveur, le paramètre TARGET_COMPLETION_INTERVAL est nécessaire pour permettre à Snowflake d’estimer les ressources de calcul requises.

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

Exemple 2 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données sont modifiées dans l’un ou l’autre des deux flux.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Exemple 3 : Créer une tâche gérée par l’utilisateur à exécuter chaque fois que des changements de données sont détectés dans deux flux de données différents. Comme la tâche utilise un AND conditionnel, la tâche est ignorée si un seul des deux flux contient de nouvelles données.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Exemple 4 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données sont modifiées dans une table de répertoire. Dans cet exemple, un flux (my_directory_table_stream) est hébergé dans une table répertoire au sein d’une zone de préparation (my_test_stage).

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Pour valider la tâche déclenchée, des données sont ajoutées à la zone de préparation.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

La table de répertoire est ensuite actualisée manuellement, ce qui déclenche la tâche.

ALTER STAGE my_test_stage REFRESH
Copy