Gestion des tâches et DAGs Snowflake avec Python

Vous pouvez utiliser Python pour gérer les tâches Snowflake, avec lesquelles vous pouvez exécuter des instructionsSQL, des appels de procédure et une logique dans Exécution de scripts Snowflake. Pour une vue d’ensemble des tâches, voir Introduction aux tâches.

L’API Python Snowflake représente les tâches de deux types distincts :

  • Task : Expose les propriétés d’une tâche telles que sa planification, ses paramètres et ses prédécesseurs.

  • TaskResource : Expose des méthodes que vous pouvez utiliser pour récupérer un objet Task correspondant, exécuter la tâche et modifier la tâche.

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser l’API Python Snowflake. Pour plus d’informations, voir Connexion à Snowflake avec l’API Python Snowflake.

Le code de l’exemple suivant utilise les paramètres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake. En utilisant l’objet Session obtenu, le code crée un objet Root pour utiliser les types et les méthodes de l’API.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Créer une tâche

Pour créer une tâche, commencez par créer un objet Task. Ensuite, en spécifiant la base de données et le schéma dans lesquels créer la tâche, créez un objet TaskCollection. Via TaskCollection.create, ajoutez la nouvelle tâche à Snowflake.

Le code de l’exemple suivant crée un objet Task représentant une tâche appelée my_task qui exécute une requête SQL spécifiée dans le paramètre definition. Il crée une variable TaskCollection tasks à partir de la base de données my_db et du schéma my_schema. Via TaskCollection.create, il crée une nouvelle tâche dans Snowflake.

Cet exemple de code spécifie également une valeur timedelta d’une heure pour la planification de la tâche. Vous pouvez définir la planification d’une tâche via une valeur timedelta ou une expression Cron.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

Vous pouvez également créer une tâche qui exécute une fonction Python ou une procédure stockée. Le code de l’exemple suivant crée une tâche appelée my_task2 qui exécute une fonction représentée par un objet StoredProcedureCall. Cet objet spécifie une fonction nommée dosomething située dans l’emplacement de zone de préparation @mystage. Vous devez également spécifier un warehouse lorsque vous créez une tâche avec un objet StoredProcedureCall.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

Créer ou mettre à jour une tâche

Vous pouvez mettre à jour les caractéristiques d’une tâche existante en définissant les propriétés d’un objet Task qui représente la tâche, puis en transmettant l’objet à Snowflake via la méthode TaskResource.create_or_update.

Vous pouvez également transmettre un objet Task décrivant une nouvelle tâche lorsque vous souhaitez créer la tâche.

Le code de l’exemple suivant détermine le nom, la définition et la planification d’une tâche, puis met à jour la tâche sur Snowflake, en la créant, si elle n’existe pas encore.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

Répertorier les tâches

Vous pouvez répertorier les tâches via la méthode TaskCollection.iter. La méthode renvoie un itérateur PagedIter d’objets Task.

Le code de l’exemple suivant répertorie les tâches dont le nom comprend le texte « my », en imprimant le nom de chacune.

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

Effectuer des opérations de tâche

Vous pouvez effectuer des opérations de tâche courantes—comme exécuter, suspendre et reprendre des tâches—avec un objet TaskResource.

Le code de l’exemple suivant exécute, suspend, reprend et supprime la tâche my_task.

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

Gestion des tâches dans un DAG

Vous pouvez gérer des tâches rassemblées dans un graphe orienté acyclique (Directed Acyclic Graph ou DAG). Un DAG est une série de tâches composée d’une seule tâche racine et de tâches supplémentaires, organisées en fonction de leurs dépendances.

Pour en savoir plus sur les tâches dans un DAG, voir DAG de tâches.

Créer un DAG de tâches

Pour créer un DAG de tâches, commencez par créer un objet DAG qui spécifie son nom et d’autres propriétés facultatives telles que sa planification. Vous pouvez définir la planification d’un DAG via une valeur timedelta ou une expression Cron.

Le code de l’exemple suivant définit une fonction Python dosomething, puis spécifie la fonction sous forme d’objet DAGTask nommé dag_task2 dans le DAG. Il définit également une instruction SQL sous la forme d’un autre objet DAGTask nommé dag_task1, puis spécifie dag_task1 comme prédécesseur de dag_task2. Pour finir, il déploie le DAG dans Snowflake dans la base de données my_db et le schéma my_schema.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

Créer un DAG avec une planification cron, des branches de tâches et des valeurs de renvoi de fonctions.

Vous pouvez également créer un DAG avec une planification cron spécifique, des branches de tâches et des valeurs de renvoi de fonctions qui sont utilisées comme valeurs de renvoi de tâches.

Le code de l’exemple suivant crée un objet DAG avec un objet Cron spécifiant sa planification. Il définit un objet DAGTaskBranch nommé task1_branch ainsi que d’autres objets DAGTask, et spécifie leurs dépendances les uns par rapport aux autres.

Cet exemple de code définit également les fonctions de gestion des tâches et crée chaque objet DAGTask et DAGTaskBranch avec un gestionnaire de tâches spécifique affecté à la tâche. Le code définit le paramètre use_func_return_value du DAG sur True, qui indique qu’il convient d’utiliser la valeur de renvoi de la fonction Python comme valeur de renvoi de la tâche correspondante. Sinon, la valeur par défaut de use_func_return_value est False.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

Définir et obtenir la valeur de renvoi d’une tâche dans un DAG

Lorsque la définition d’une tâche est un objet StoredProcedureCall, le gestionnaire de la procédure stockée (ou de la fonction) peut explicitement définir la valeur de renvoi de la tâche via un objet TaskContext.

Pour plus d’informations, voir SYSTEM$SET_RETURN_VALUE.

Le code de l’exemple suivant définit une fonction de gestion des tâches qui crée un objet TaskContext nommé context à partir de la session en cours. Il utilise ensuite la méthode TaskContext.set_return_value pour définir explicitement la valeur de renvoi sur une chaîne spécifiée.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

Dans un DAG de tâches, une tâche de successeur immédiate qui identifie la tâche précédente comme son prédécesseur peut alors récupérer la valeur de renvoi explicitement définie par la tâche de prédécesseur.

Pour plus d’informations, voir SYSTEM$GET_PREDECESSOR_RETURN_VALUE.

Le code de l’exemple suivant définit une fonction de gestion des tâches qui utilise la méthode TaskContext.get_predecessor_return_value pour obtenir la valeur de renvoi de la tâche de prédécesseur nommée pred_task_name.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy