Tutoriel 2 : Créer et gérer des tâches et des graphiques de tâches (DAGs)

Introduction

Dans ce tutoriel, vous créez et utilisez des tâches Snowflake pour gérer certaines procédures stockées de base. Vous créez également un graphique de tâches — également appelé graphe orienté acyclique (DAG) — pour orchestrer des tâches avec une API de graphe de tâche supérieur.

Conditions préalables

Note

Si vous avez déjà effectué les étapes de Configuration commune pour les tutoriels sur les APIs Snowflake Python et Tutoriel 1 : Créer une base de données, un schéma, une table et un entrepôt, vous pouvez ignorer ces prérequis et passer à la première étape de ce tutoriel.

Avant de commencer ce tutoriel, vous devez suivre les étapes suivantes :

  1. Suivez les instructions de configuration commune, qui comprennent les étapes suivantes :

    • Configurez votre environnement de développement.

    • Installez le paquet Snowflake Python APIs.

    • Configurez votre connexion Snowflake.

    • Importez tous les modules nécessaires aux tutoriels d’API Python.

    • Créez un objet d’API Root.

  2. Exécutez le code suivant pour créer une base de données nommée PYTHON_API_DB et un schéma nommé PYTHON_API_SCHEMA dans cette base de données.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    Ce sont les mêmes objets de base de données et de schéma que vous créez dans Tutoriel 1.

Après avoir rempli ces conditions préalables, vous êtes prêt à commencer à utiliser l’API pour la gestion des tâches.

Configurer des objets Snowflake

Configurez les procédures stockées que vos tâches appelleront et la zone de préparation qui contiendra les procédures stockées. Vous pouvez utiliser votre objet Snowflake Python APIs root pour créer une zone de préparation dans la base de données PYTHON_API_DB et le schéma PYTHON_API_SCHEMA que vous avez créés précédemment.

  1. Pour créer une zone de préparation nommée TASKS_STAGE, dans la cellule suivante de votre notebook, exécutez le code suivant :

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    Cette zone de préparation contiendra les procédures stockées et toutes les dépendances dont ces procédures ont besoin.

  2. Pour créer deux fonctions Python de base que les tâches exécuteront en tant que procédures stockées, dans votre cellule suivante, exécutez le code suivant :

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    Ces fonctions effectuent les opérations suivantes :

    • trunc() : crée une version tronquée d’une table d’entrée.

    • filter_by_shipmode() : filtre la table SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM par mode de navire, limite les résultats à 10 lignes et écrit les résultats dans une nouvelle table.

      Note

      Cette fonction interroge les données d’exemple TPC-H dans la base de données SNOWFLAKE_SAMPLE_DATA. Par défaut, Snowflake crée la base de données d’exemple dans les nouveaux comptes. Si la base de données n’a pas été créée dans votre compte, voir Utilisation de la base de données d’échantillon.

    Les fonctions sont volontairement basiques et sont destinées à des fins de démonstration.

Créer et gérer des tâches

Définissez, créez et gérez deux tâches qui exécuteront vos fonctions Python précédemment créées en tant que procédures stockées.

  1. Pour définir les deux tâches, task1 et task2, dans la cellule suivante de votre notebook, exécutez le code suivant :

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    Dans ce code, vous spécifiez les paramètres de tâche suivants :

    • Pour chaque tâche, une définition représentée par un objet StoredProcedureCall qui comprend les attributs suivants :

      • La fonction appelable à exécuter

      • L’emplacement de la zone de préparation où le contenu de votre fonction Python et ses dépendances sont téléchargés

      • Dépendances du paquet de la procédure stockée

    • Un entrepôt pour exécuter la procédure stockée (requis lors de la création d’une tâche avec un objet StoredProcedureCall). Ce tutoriel utilise l’entrepôt COMPUTE_WH qui est inclus dans votre compte d’essai.

    • Un calendrier d’exécution pour la tâche racine, task1. La planification spécifie l’intervalle auquel exécuter la tâche périodiquement.

    Pour plus d’informations sur les procédures stockées, voir Écriture de procédures stockées en Python.

  2. Pour créer les deux tâches, récupérez un objet TaskCollection (tasks) à partir de votre schéma de base de données et appelez .create() sur votre collection de tâches :

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    Dans cet exemple de code, vous liez également les tâches en définissant task1 en tant que prédécesseur de task2, ce qui crée un graphique de tâches minimal.

  3. Pour confirmer que les deux tâches existent désormais, dans votre cellule suivante, exécutez le code suivant :

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. Lorsque vous créez des tâches, elles sont suspendues par défaut.

    Pour démarrer une tâche, appelez .resume() sur l’objet ressource de la tâche :

    trunc_task.resume()
    
    Copy
  5. Pour confirmer que la tâche trunc_task a été démarrée, dans votre cellule suivante, exécutez le code suivant :

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    Le résultat devrait être similaire à ceci :

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    Vous pouvez répéter cette étape chaque fois que vous souhaitez confirmer le statut des tâches.

  6. Pour nettoyer vos ressources de tâche, vous devez d’abord suspendre la tâche avant de la supprimer.

    Dans votre cellule suivante, exécutez le code suivant :

    trunc_task.suspend()
    
    Copy
  7. Pour confirmer que la tâche est suspendue, répétez l’étape 5.

  8. Facultatif : pour supprimer les deux tâches, dans votre cellule suivante, exécutez le code suivant :

    trunc_task.drop()
    filter_task.drop()
    
    Copy

Créer et gérer un graphique de tâches

Lorsque vous coordonnez l’exécution d’un grand nombre de tâches, la gestion individuelle de chaque tâche peut être un défi. Snowflake Python APIs fournit des fonctionnalités pour orchestrer des tâches avec une API de graphique de tâches de niveau supérieur.

Un graphique de tâches, ou graphe orienté acyclique (DAG) est une série de tâches composées d’une seule tâche racine et de tâches enfants, organisées par leurs dépendances. Pour plus d’informations, voir Gérer les dépendances des tâches à l’aide de graphiques de tâches.

  1. Pour créer et déployer un graphique de tâches, exécutez le code suivant :

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    Dans ce code, vous effectuez les opérations suivantes :

    • Créez un objet de graphique de tâches en appelant le constructeur DAG et en spécifiant un nom et un calendrier.

    • Définissez des tâches spécifiques au graphique de tâches à l’aide du constructeur DAGTask. Notez que le constructeur accepte les mêmes arguments que ceux que vous avez spécifiés pour la classe StoredProcedureCall dans une étape précédente.

    • Spécifiez dag_task1 comme tâche racine et prédécesseur de dag_task2 avec une syntaxe plus pratique.

    • Déployez le graphique des tâches sur le schéma PYTHON_API_SCHEMA de la base de données PYTHON_API_DB.

  2. Pour confirmer la création du graphique de tâches, dans votre cellule suivante, exécutez le code suivant :

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    Vous pouvez répéter cette étape chaque fois que vous souhaitez confirmer le statut des tâches.

  3. Pour démarrer le graphique des tâches en démarrant la tâche racine, dans votre cellule suivante, exécutez le code suivant :

    dag_op.run(dag)
    
    Copy
  4. Pour confirmer que la tâche PYTHON_API_DAG$TASK_PYTHON_API_TRUNC a démarré, répétez l’étape 2.

    Note

    L’appel de fonction invoqué par le graphique de tâches ne réussira pas car vous ne l’appelez avec aucun de ses arguments requis. Le but de cette étape est uniquement de démontrer comment démarrer par programmation le graphique des tâches.

  5. Pour supprimer le graphique des tâches, dans votre cellule suivante, exécutez le code suivant :

    dag_op.drop(dag)
    
    Copy
  6. Nettoyez l’objet de base de données que vous avez créé dans ces tutoriels :

    database.drop()
    
    Copy

Quelle est la prochaine étape ?

Félicitations ! Dans ce tutoriel, vous avez appris à créer et à gérer des tâches et des graphiques de tâches à l’aide de Snowflake Python APIs.

Résumé

En cours de route, vous avez accompli les étapes suivantes :

  • Créez une zone de préparation pouvant contenir des procédures stockées et leurs dépendances.

  • Créer et gérer des tâches.

  • Créez et gérez un graphique de tâches.

  • Nettoyez vos objets de ressources Snowflake en les supprimant.

Tutoriel suivant

Vous pouvez maintenant procéder à Tutoriel 3 : Créer et gérer les Snowpark Container Services, qui montre comment créer et gérer des composants dans Snowpark Container Services.

Ressources supplémentaires

Pour plus d’exemples d’utilisation de l’API pour gérer d’autres types d’objets dans Snowflake, consultez les guides de développement suivants :

Guide

Description

Gestion des bases de données, schémas, tables et vues Snowflake avec Python

Utilisez l’API pour créer et gérer des bases de données, des schémas et des tables.

Gestion des utilisateurs, des rôles et des attributions Snowflake avec Python

Utilisez l’API pour créer et gérer les utilisateurs, les rôles et les autorisations.

Gestion des ressources de chargement et de déchargement de données avec Python

Utilisez l’API pour créer et gérer les ressources de chargement et de déchargement de données, y compris les volumes externes, les canaux et les zones de préparation.

Gestion de Snowpark Container Services (y compris les fonctions de service) avec Python

Utilisez l’API pour gérer les composants Snowpark Container Services, notamment les pools de calcul, les référentiels d’images, les services et les fonctions de services.