Tâches déclenchées

Utilisez les tâches déclenchées pour exécuter des tâches à chaque fois qu’il y a un changement dans un flux. Il n’est donc pas nécessaire d’interroger fréquemment une source lorsque la disponibilité de nouvelles données est imprévisible. Il réduit également le temps de latence car les données sont traitées immédiatement.

Les tâches déclenchées n’utilisent pas de ressources de calcul jusqu’à ce que l’événement soit déclenché.

Considérations

Les tâches déclenchées sont prises en charge avec les éléments suivants :

  • Tables

  • Vues

  • Tables dynamiques

  • Tables Apache Iceberg™ (gérées et non gérées)

  • Partages de données

  • Tables de répertoire Une table de répertoire doit être actualisée avant qu’une tâche déclenchée puisse détecter les modifications. Pour détecter les modifications, vous pouvez effectuer l’une des tâches suivantes :

Les tâches déclenchées ne sont pas prises en charge avec les éléments suivants :

  • Tables hybrides

  • Flux sur des tables externes

Pour que les consommateurs puissent créer des flux sur des tables partagées ou des vues sécurisées, le fournisseur de données doit activer le suivi des modifications sur les tables et les vues destinées au partage dans son compte ; c’est-à-dire ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;. Si le suivi des modifications n’est pas activé, les consommateurs ne peuvent pas créer de flux sur les données partagées. Pour plus d’informations, voir Flux sur les objets partagés.

Créer une tâche déclenchée

Utilisez CREATE TASK en spécifiant les paramètres suivants :

  • Définissez le flux cible à l’aide de la clause WHEN. (Ne pas inclure le paramètre SCHEDULE.)

  • Exigences supplémentaires basées sur les ressources de calcul :

    • Pour créer une tâche qui s’exécute sur un entrepôt géré par l’utilisateur, incluez le paramètre WAREHOUSE et définissez l’entrepôt.

    • Pour créer une tâche sans serveur, vous devez inclure le paramètre TARGET_COMPLETION_INTERVAL. N’incluez pas le paramètre WAREHOUSE. Snowflake estime les ressources nécessaires en fonction de l’intervalle d’achèvement cible et s’adapte pour terminer la tâche dans ce délai.

L’exemple suivant crée une tâche déclenchée sans serveur qui s’exécute chaque fois que des données changent dans un flux.

Diagramme montrant une tâche déclenchée sans serveur
CREATE TASK my_triggered_task
  TARGET_COMPLETION_INTERVAL='15 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Migrer une tâche existante d’une tâche planifiée vers une tâche déclenchée

  1. Suspendez la tâche.

  2. Utilisez ALTER TASK pour mettre à jour la tâche. Désactivez le paramètre SCHEDULE, puis ajoutez la clause WHEN pour définir le flux cible.

  3. Reprenez la tâche.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrer une tâche déclenchée existante gérée par l’utilisateur vers une tâche déclenchée sans serveur

  1. Suspendez la tâche.

  2. Utilisez ALTER TASK pour mettre à jour la tâche. Supprimez le paramètre WAREHOUSE, puis définissez le paramètre TARGET_COMPLETION_INTERVAL.

  3. Reprenez la tâche.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Pour plus d’informations, voir Tâches sans serveur.

Autoriser l’exécution d’une tâche déclenchée

Lorsque vous créez une tâche déclenchée, elle démarre dans l’état suspendu.

Pour commencer à surveiller le flux :

La tâche s’exécute dans les conditions suivantes :

  • Lorsque vous reprenez pour la première fois une tâche déclenchée, la tâche vérifie si le flux a été modifié après l’exécution de la dernière tâche. S’il y a un changement, la tâche s’exécute. sinon, la tâche est sautée sans utiliser de ressources de calcul.

  • Si une tâche est en cours d’exécution et que le flux contient de nouvelles données, la tâche s’interrompt jusqu’à ce que la tâche en cours soit terminée. Snowflake garantit l’exécution d’une seule instance d’une tâche à la fois.

  • Une fois la tâche terminée, Snowflake vérifie à nouveau le flux pour y détecter des changements. S’il y a des changements, la tâche s’exécute à nouveau ; sinon, elle est ignorée.

  • La tâche s’exécute chaque fois que de nouvelles données sont détectées dans le flux.

  • Si les données de flux sont hébergées sur une table de répertoire, vous détectez les modifications en effectuant l’une des tâches suivantes :

  • Toutes les 12 heures, la tâche effectue un bilan de santé pour éviter que les flux ne deviennent périmés. S’il n’y a pas de changement, Snowflake passe à la tâche sans utiliser de ressources de calcul. Pour les flux, les instructions de la tâche doivent consommer les données du flux avant l’expiration du délai de conservation des données, faute de quoi le flux devient périmé. Pour plus d’informations, voir Éviter l’obsolescence des flux.

  • Par défaut, les tâches déclenchées s’exécutent au maximum toutes les 30 secondes. Vous pouvez modifier le paramètre USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS pour qu’il s’exécute plus fréquemment, jusqu’à toutes les 10 secondes.

  • Lorsqu’une tâche est déclenchée par Streams on Views, toute modification apportée aux tables référencées par la requête Streams on Views déclenche également la tâche, indépendamment des jointures, agrégations ou filtres de la requête.

Diagramme montrant comment les tâches déclenchées gèrent les nouvelles données au fur et à mesure qu'elles arrivent et vérifient également les modifications toutes les 12 heures.

Surveillance des tâches déclenchées

  • Dans la sortie de SHOW TASKS et de DESC TASK, la propriété SCHEDULE affiche NULL pour les tâches déclenchées.

  • Dans la sortie de la vue task_history des schemas information_schema et account_usage, la colonne SCHEDULED_FROM affiche TRIGGER.

Exemples

Exemple 1 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données changent dans l’un ou l’autre des deux flux :

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Exemple 2 : Créer une tâche à exécuter chaque fois que des changements de données sont détectés dans deux flux de données différents. Comme la tâche utilise un AND conditionnel, la tâche est ignorée si un seul des deux flux contient de nouvelles données.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Exemple 3 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données changent dans une table de répertoire. Dans l’exemple, un flux — my_directory_table_stream — est hébergé sur une table de répertoire sur une zone de préparation appelée my_test_stage.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Pour valider la tâche déclenchée, des données sont ajoutées à la zone de préparation.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

La table de répertoire est ensuite actualisée manuellement, ce qui déclenche la tâche.

ALTER STAGE my_test_stage REFRESH
Copy