Tâches déclenchées¶
Utilisez les tâches déclenchées pour exécuter des tâches à chaque fois qu’il y a un changement dans un flux. Il n’est donc pas nécessaire d’interroger fréquemment une source lorsque la disponibilité de nouvelles données est imprévisible. Il réduit également le temps de latence car les données sont traitées immédiatement.
Les tâches déclenchées n’utilisent pas de ressources de calcul jusqu’à ce que l’événement soit déclenché.
Considérations¶
Pour les flux hébergés sur une table de répertoire, cette table doit être actualisée avant qu’une tâche déclenchée puisse détecter les changements. Pour détecter les changements, vous pouvez procéder de l’une des manières suivantes :
Paramétrez la table de répertoire pour qu’elle s’actualise automatiquement.
Actualisez manuellement la table du répertoire à l’aide de la commande ALTER STAGE nom REFRESH.
Les flux sur les tables externes et les tables hybrides ne sont pas pris en charge.
Créer une tâche déclenchée¶
Utilisez CREATE TASK en spécifiant les paramètres suivants :
Définissez le flux cible à l’aide de la clause
WHEN
. (Ne pas inclure le paramètreSCHEDULE
.)Lorsque vous travaillez avec plusieurs flux de données, vous pouvez utiliser les paramètres conditionnels
WHEN ... AND
etWHEN ... OR
.Exigences supplémentaires basées sur les ressources de calcul :
Pour créer une tâche sans serveur, vous devez inclure le paramètre
TARGET_COMPLETION_INTERVAL
. N’incluez pas le paramètreWAREHOUSE
. Snowflake estime les ressources nécessaires en fonction de l’intervalle d’achèvement cible et s’adapte pour terminer la tâche dans ce délai.
Pour créer une tâche qui s’exécute sur un entrepôt géré par l’utilisateur, incluez le paramètre
WAREHOUSE
et définissez l’entrepôt.
Migrer une tâche existante d’une tâche planifiée vers une tâche déclenchée¶
Suspendez la tâche.
Annulez le paramètre
SCHEDULE
et ajoutez la clauseWHEN
pour définir le flux cible.Reprenez la tâche.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Migrer une tâche déclenchée existante gérée par l’utilisateur vers une tâche déclenchée sans serveur¶
Suspendez la tâche.
Supprimez le paramètre
WAREHOUSE
et spécifiez le paramètreTARGET_COMPLETION_INTERVAL
.Reprenez la tâche.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Pour plus d’informations, voir Tâches sans serveur.
Autoriser l’exécution de la tâche déclenchée¶
Lorsque vous créez une tâche déclenchée, elle démarre dans l’état suspendu.
Pour commencer à surveiller le flux :
Reprenez la tâche en utilisant ALTER TASK. .. RESUME.
La tâche s’exécute dans les conditions suivantes :
Lorsque vous reprenez une tâche déclenchée pour la première fois, celle-ci vérifie que le flux n’a pas changé depuis sa dernière exécution. En cas de changement, la tâche est exécutée. Dans le cas contraire, la tâche est ignorée sans utiliser de ressources de calcul.
Si une tâche est en cours d’exécution et que le flux contient de nouvelles données, la tâche déclenchée attend que cette tâche en cours soit terminée. Snowflake veille à ce qu’une seule instance d’une tâche soit exécutée à la fois.
Une fois la tâche terminée, Snowflake vérifie à nouveau le flux pour y détecter des changements. S’il y a des changements, la tâche s’exécute à nouveau ; sinon, elle est ignorée.
La tâche s’exécute chaque fois que de nouvelles données sont détectées dans le flux.
Si les données du flux sont hébergées dans une table de répertoire, vous pouvez procéder de l’une des manières suivantes pour détecter les changements :
Toutes les 12 heures, la tâche effectue un bilan de santé pour éviter que les flux ne deviennent périmés. S’il n’y a pas de changement, Snowflake passe à la tâche sans utiliser de ressources de calcul. Pour les flux, les instructions de la tâche doivent consommer les données du flux avant l’expiration du délai de conservation des données, faute de quoi le flux devient périmé. Pour plus d’informations, voir Éviter l’obsolescence des flux.
Par défaut, les tâches déclenchées s’exécutent au maximum toutes les 30 secondes. Vous pouvez modifier le paramètre USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS pour qu’il s’exécute plus fréquemment, jusqu’à toutes les 10 secondes.
Lorsqu’une tâche est déclenchée par Streams on Views, toute modification apportée aux tables référencées par la requête Streams on Views déclenche également la tâche, indépendamment des jointures, agrégations ou filtres de la requête.
Surveillance des tâches déclenchées¶
Dans la sortie de
SHOW TASKS
et deDESC TASK
, la propriétéSCHEDULE
afficheNULL
pour les tâches déclenchées.Dans la sortie de la vue task_history des schemas information_schema et account_usage, la colonne SCHEDULED_FROM affiche TRIGGER.
Exemples¶
Exemple 1 : Créer une tâche sans serveur qui s’exécute à chaque fois que les données sont modifiées dans un flux.
La tâche étant sans serveur, le paramètre TARGET_COMPLETION_INTERVAL
est nécessaire pour permettre à Snowflake d’estimer les ressources de calcul requises.
CREATE TASK my_task
TARGET_COMPLETION_INTERVAL='120 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS SELECT 1;
Exemple 2 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données sont modifiées dans l’un ou l’autre des deux flux.
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Exemple 3 : Créer une tâche gérée par l’utilisateur à exécuter chaque fois que des changements de données sont détectés dans deux flux de données différents. Comme la tâche utilise un AND conditionnel, la tâche est ignorée si un seul des deux flux contient de nouvelles données.
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
Exemple 4 : Créer une tâche gérée par l’utilisateur qui s’exécute chaque fois que des données sont modifiées dans une table de répertoire. Dans cet exemple, un flux (my_directory_table_stream) est hébergé dans une table répertoire au sein d’une zone de préparation (my_test_stage).
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
Pour valider la tâche déclenchée, des données sont ajoutées à la zone de préparation.
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
La table de répertoire est ensuite actualisée manuellement, ce qui déclenche la tâche.
ALTER STAGE my_test_stage REFRESH