Tutoriel 2 : Créer et gérer des tâches et des graphiques de tâches (DAGs)¶
Introduction¶
Dans ce tutoriel, vous créez et utilisez des tâches Snowflake pour gérer certaines procédures stockées de base. Vous créez également un graphique de tâches — également appelé graphe orienté acyclique (DAG) — pour orchestrer des tâches avec une API de graphe de tâche supérieur.
Conditions préalables¶
Note
Si vous avez déjà effectué les étapes de Configuration commune pour les tutoriels sur les APIs Snowflake Python et Tutoriel 1 : Créer une base de données, un schéma, une table et un entrepôt, vous pouvez ignorer ces prérequis et passer à la première étape de ce tutoriel.
Avant de commencer ce tutoriel, vous devez suivre les étapes suivantes :
Suivez les instructions de configuration commune, qui comprennent les étapes suivantes :
Configurez votre environnement de développement.
Installez le paquet Snowflake Python APIs.
Configurez votre connexion Snowflake.
Importez tous les modules nécessaires aux tutoriels d’API Python.
Créez un objet d’API
Root
.
Exécutez le code suivant pour créer une base de données nommée
PYTHON_API_DB
et un schéma nomméPYTHON_API_SCHEMA
dans cette base de données.database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
Ce sont les mêmes objets de base de données et de schéma que vous créez dans Tutoriel 1.
Après avoir rempli ces conditions préalables, vous êtes prêt à commencer à utiliser l’API pour la gestion des tâches.
Configurer des objets Snowflake¶
Configurez les procédures stockées que vos tâches appelleront et la zone de préparation qui contiendra les procédures stockées. Vous pouvez utiliser votre objet Snowflake Python APIs root
pour créer une zone de préparation dans la base de données PYTHON_API_DB
et le schéma PYTHON_API_SCHEMA
que vous avez créés précédemment.
Pour créer une zone de préparation nommée
TASKS_STAGE
, dans la cellule suivante de votre notebook, exécutez le code suivant :stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
Cette zone de préparation contiendra les procédures stockées et toutes les dépendances dont ces procédures ont besoin.
Pour créer deux fonctions Python de base que les tâches exécuteront en tant que procédures stockées, dans votre cellule suivante, exécutez le code suivant :
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
Ces fonctions effectuent les opérations suivantes :
trunc()
: crée une version tronquée d’une table d’entrée.filter_by_shipmode()
: filtre la tableSNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
par mode de navire, limite les résultats à 10 lignes et écrit les résultats dans une nouvelle table.Note
Cette fonction interroge les données d’exemple TPC-H dans la base de données SNOWFLAKE_SAMPLE_DATA. Par défaut, Snowflake crée la base de données d’exemple dans les nouveaux comptes. Si la base de données n’a pas été créée dans votre compte, voir Utilisation de la base de données d’échantillon.
Les fonctions sont volontairement basiques et sont destinées à des fins de démonstration.
Créer et gérer des tâches¶
Définissez, créez et gérez deux tâches qui exécuteront vos fonctions Python précédemment créées en tant que procédures stockées.
Pour définir les deux tâches,
task1
ettask2
, dans la cellule suivante de votre notebook, exécutez le code suivant :tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
Dans ce code, vous spécifiez les paramètres de tâche suivants :
Pour chaque tâche, une définition représentée par un objet StoredProcedureCall qui comprend les attributs suivants :
La fonction appelable à exécuter
L’emplacement de la zone de préparation où le contenu de votre fonction Python et ses dépendances sont téléchargés
Dépendances du paquet de la procédure stockée
Un entrepôt pour exécuter la procédure stockée (requis lors de la création d’une tâche avec un objet
StoredProcedureCall
). Ce tutoriel utilise l’entrepôtCOMPUTE_WH
qui est inclus dans votre compte d’essai.Un calendrier d’exécution pour la tâche racine,
task1
. La planification spécifie l’intervalle auquel exécuter la tâche périodiquement.
Pour plus d’informations sur les procédures stockées, voir Écriture de procédures stockées en Python.
Pour créer les deux tâches, récupérez un objet
TaskCollection
(tasks
) à partir de votre schéma de base de données et appelez.create()
sur votre collection de tâches :# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
Dans cet exemple de code, vous liez également les tâches en définissant
task1
en tant que prédécesseur detask2
, ce qui crée un graphique de tâches minimal.Pour confirmer que les deux tâches existent désormais, dans votre cellule suivante, exécutez le code suivant :
taskiter = tasks.iter() for t in taskiter: print(t.name)
Lorsque vous créez des tâches, elles sont suspendues par défaut.
Pour démarrer une tâche, appelez
.resume()
sur l’objet ressource de la tâche :trunc_task.resume()
Pour confirmer que la tâche
trunc_task
a été démarrée, dans votre cellule suivante, exécutez le code suivant :taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
Le résultat devrait être similaire à ceci :
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
Vous pouvez répéter cette étape chaque fois que vous souhaitez confirmer le statut des tâches.
Pour nettoyer vos ressources de tâche, vous devez d’abord suspendre la tâche avant de la supprimer.
Dans votre cellule suivante, exécutez le code suivant :
trunc_task.suspend()
Pour confirmer que la tâche est suspendue, répétez l’étape 5.
Facultatif : pour supprimer les deux tâches, dans votre cellule suivante, exécutez le code suivant :
trunc_task.drop() filter_task.drop()
Créer et gérer un graphique de tâches¶
Lorsque vous coordonnez l’exécution d’un grand nombre de tâches, la gestion individuelle de chaque tâche peut être un défi. Snowflake Python APIs fournit des fonctionnalités pour orchestrer des tâches avec une API de graphique de tâches de niveau supérieur.
Un graphique de tâches, ou graphe orienté acyclique (DAG) est une série de tâches composées d’une seule tâche racine et de tâches enfants, organisées par leurs dépendances. Pour plus d’informations, voir Gérer les dépendances des tâches à l’aide de graphiques de tâches.
Pour créer et déployer un graphique de tâches, exécutez le code suivant :
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
Dans ce code, vous effectuez les opérations suivantes :
Créez un objet de graphique de tâches en appelant le constructeur
DAG
et en spécifiant un nom et un calendrier.Définissez des tâches spécifiques au graphique de tâches à l’aide du constructeur
DAGTask
. Notez que le constructeur accepte les mêmes arguments que ceux que vous avez spécifiés pour la classeStoredProcedureCall
dans une étape précédente.Spécifiez
dag_task1
comme tâche racine et prédécesseur dedag_task2
avec une syntaxe plus pratique.Déployez le graphique des tâches sur le schéma
PYTHON_API_SCHEMA
de la base de donnéesPYTHON_API_DB
.
Pour confirmer la création du graphique de tâches, dans votre cellule suivante, exécutez le code suivant :
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
Vous pouvez répéter cette étape chaque fois que vous souhaitez confirmer le statut des tâches.
Pour démarrer le graphique des tâches en démarrant la tâche racine, dans votre cellule suivante, exécutez le code suivant :
dag_op.run(dag)
Pour confirmer que la tâche
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
a démarré, répétez l’étape 2.Note
L’appel de fonction invoqué par le graphique de tâches ne réussira pas car vous ne l’appelez avec aucun de ses arguments requis. Le but de cette étape est uniquement de démontrer comment démarrer par programmation le graphique des tâches.
Pour supprimer le graphique des tâches, dans votre cellule suivante, exécutez le code suivant :
dag_op.drop(dag)
Nettoyez l’objet de base de données que vous avez créé dans ces tutoriels :
database.drop()
Quelle est la prochaine étape ?¶
Félicitations ! Dans ce tutoriel, vous avez appris à créer et à gérer des tâches et des graphiques de tâches à l’aide de Snowflake Python APIs.
Résumé¶
En cours de route, vous avez accompli les étapes suivantes :
Créez une zone de préparation pouvant contenir des procédures stockées et leurs dépendances.
Créer et gérer des tâches.
Créez et gérez un graphique de tâches.
Nettoyez vos objets de ressources Snowflake en les supprimant.
Tutoriel suivant¶
Vous pouvez maintenant procéder à Tutoriel 3 : Créer et gérer les Snowpark Container Services, qui montre comment créer et gérer des composants dans Snowpark Container Services.
Ressources supplémentaires¶
Pour plus d’exemples d’utilisation de l’API pour gérer d’autres types d’objets dans Snowflake, consultez les guides de développement suivants :
Guide |
Description |
---|---|
Gestion des bases de données, schémas, tables et vues Snowflake avec Python |
Utilisez l’API pour créer et gérer des bases de données, des schémas et des tables. |
Gestion des utilisateurs, des rôles et des attributions Snowflake avec Python |
Utilisez l’API pour créer et gérer les utilisateurs, les rôles et les autorisations. |
Gestion des ressources de chargement et de déchargement de données avec Python |
Utilisez l’API pour créer et gérer les ressources de chargement et de déchargement de données, y compris les volumes externes, les canaux et les zones de préparation. |
Gestion de Snowpark Container Services (y compris les fonctions de service) avec Python |
Utilisez l’API pour gérer les composants Snowpark Container Services, notamment les pools de calcul, les référentiels d’images, les services et les fonctions de services. |