Gestion des tâches et des graphiques de tâches Snowflake avec Python¶
Vous pouvez utiliser Python pour gérer les tâches Snowflake, avec lesquelles vous pouvez exécuter des instructionsSQL, des appels de procédure et une logique dans Exécution de scripts Snowflake. Pour une vue d’ensemble des tâches, voir Introduction aux tâches.
Les Snowflake Python APIs représentent des tâches avec deux types distincts :
Task: Expose les propriétés d’une tâche telles que sa planification, ses paramètres et ses prédécesseurs.TaskResource: Expose des méthodes que vous pouvez utiliser pour récupérer un objetTaskcorrespondant, exécuter la tâche et modifier la tâche.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API. Pour plus d’informations, voir Connexion à Snowflake avec Snowflake Python APIs.
Création d’une tâche¶
Pour créer une tâche, commencez par créer un objet Task. Ensuite, en spécifiant la base de données et le schéma dans lesquels créer la tâche, créez un objet TaskCollection. Via TaskCollection.create, ajoutez la nouvelle tâche à Snowflake.
Le code de l’exemple suivant crée un objet Task représentant une tâche nommée my_task qui exécute une requête SQL spécifiée dans le paramètre definition :
Ce code crée une variable TaskCollection tasks à partir de la base de données my_db et du schéma my_schema. Via TaskCollection.create, il crée une nouvelle tâche dans Snowflake.
Cet exemple de code spécifie également une valeur timedelta d’une heure pour la planification de la tâche. Vous pouvez définir la planification d’une tâche via une valeur timedelta ou une expression Cron.
Vous pouvez également créer une tâche qui exécute une fonction Python ou une procédure stockée. Le code de l’exemple suivant crée une tâche nommée my_task2 qui exécute une fonction représentée par un objet StoredProcedureCall :
Cet objet spécifie une fonction nommée dosomething située dans l’emplacement de zone de préparation @mystage. Vous devez également spécifier un warehouse lorsque vous créez une tâche avec un objet StoredProcedureCall.
Création ou modification d’une tâche¶
Vous pouvez définir les propriétés d’un objet Task et le transmettre à la méthode TaskResource.create_or_alter pour créer une tâche si elle n’existe pas, ou la modifier selon la définition de la tâche si elle existe. Le comportement de create_or_alter est censé être idempotent, ce qui signifie que l’objet de tâche résultant sera le même, que la tâche existe ou non avant l’appel de la méthode.
Note
La méthode create_or_alter utilise des valeurs par défaut pour toute propriété de tâche que vous ne définissez pas explicitement. Par exemple, si vous ne définissez pas schedule, sa valeur par défaut est None même si la tâche existait auparavant avec une valeur différente.
Le code de l’exemple suivant met à jour la définition et la planification de la tâche my_task, puis modifie la tâche sur Snowflake :
Répertorier les tâches¶
Vous pouvez répertorier les tâches via la méthode TaskCollection.iter. La méthode renvoie un itérateur PagedIter d’objets Task.
Le code de l’exemple suivant répertorie les tâches dont le nom commence par my :
Effectuer des opérations de tâche¶
Vous pouvez effectuer des opérations de tâche courantes—comme exécuter, suspendre et reprendre des tâches—avec un objet TaskResource.
Le code de l’exemple suivant exécute, suspend, reprend et supprime la tâche my_task :
Gestion des tâches dans un graphique de tâches¶
Vous pouvez gérer des tâches rassemblées dans un graphique de tâches. Un graphique de tâches est une série de tâches composée d’une seule tâche racine et de tâches supplémentaires, organisées en fonction de leurs dépendances.
Pour en savoir plus sur les tâches dans un graphique de tâches, voir Créer une séquence de tâches à l’aide d’un graphique de tâches.
Création d’un graphique de tâches¶
Pour créer un graphique de tâches, commencez par créer un objet DAG qui spécifie son nom et d’autres propriétés facultatives telles que sa planification. Vous pouvez définir la planification d’un graphique de tâches via une valeur timedelta ou une expression Cron.
Le code de l’exemple suivant définit une fonction Python dosomething, puis spécifie la fonction sous forme d’objet DAGTask nommé dag_task2 dans le graphique de tâches.
Ce code définit également une instruction SQL sous la forme d’un autre objet DAGTask nommé dag_task1, puis spécifie dag_task1 comme prédécesseur de dag_task2. Pour finir, il déploie le graphique de tâches dans Snowflake dans la base de données my_db et le schéma my_schema.
Création d’un graphique de tâches avec une planification cron, des branches de tâches et des valeurs de renvoi de fonctions.¶
Vous pouvez également créer un graphique de tâches avec une planification cron spécifique, des branches de tâches et des valeurs de renvoi de fonctions qui sont utilisées comme valeurs de renvoi de tâches.
Le code de l’exemple suivant crée un objet DAG avec un objet Cron spécifiant sa planification. Il définit un objet DAGTaskBranch nommé task1_branch ainsi que d’autres objets DAGTask et spécifie leurs dépendances les uns par rapport aux autres :
Cet exemple de code définit également les fonctions de gestion des tâches et crée chaque objet DAGTask et DAGTaskBranch avec un gestionnaire de tâches spécifique affecté à la tâche. Le code définit le paramètre use_func_return_value du DAG sur True, qui indique qu’il convient d’utiliser la valeur de renvoi de la fonction Python comme valeur de renvoi de la tâche correspondante. Sinon, la valeur par défaut de use_func_return_value est False.
Définition et obtention de la valeur de retour d’une tâche dans un graphique de tâches¶
Lorsque la définition d’une tâche est un objet StoredProcedureCall, le gestionnaire de la procédure stockée (ou de la fonction) peut explicitement définir la valeur de renvoi de la tâche via un objet TaskContext.
Pour plus d’informations, voir SYSTEM$SET_RETURN_VALUE.
Le code de l’exemple suivant définit une fonction de gestion des tâches qui crée un objet TaskContext nommé context à partir de la session en cours. Il utilise ensuite la méthode TaskContext.set_return_value pour définir explicitement la valeur de renvoi sur une chaîne spécifiée :
Dans un graphique de tâches, une tâche de successeur immédiate qui identifie la tâche précédente comme son prédécesseur peut alors récupérer la valeur de renvoi explicitement définie par la tâche de prédécesseur.
Pour plus d’informations, voir SYSTEM$GET_PREDECESSOR_RETURN_VALUE.
Le code de l’exemple suivant définit une fonction de gestionnaire de tâches qui utilise la méthode TaskContext.get_predecessor_return_value pour obtenir la valeur de renvoi de la tâche de prédécesseur nommée pred_task_name :