>>> from snowflake.snowpark import Session
>>> from snowflake.core import Root
>>> from snowflake.core._common import CreateMode
>>> from snowflake.core.task import Cron
>>> from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
>>> session = Session.builder.create()
>>> test_stage = "mystage"
>>> test_dag = "mydag"
>>> test_db = "mydb"
>>> test_schema = "public"
>>> test_warehouse = "testwh_python"
>>> root = Root(session)
>>> schema = root.databases[test_db].schemas[test_schema]
>>> def task_handler1(session: Session) -> None:
... pass # do something
>>> def task_handler2(session: Session) -> None:
... pass # do something
>>> def task_handler3(session: Session) -> None:
... pass # do something
>>> def task_branch_handler(session: Session) -> str:
... # do something
... return "task3"
>>> try:
... with DAG(
... test_dag,
... schedule=Cron("10 * * * *", "America/Los_Angeles"),
... stage_location=test_stage,
... packages=["snowflake-snowpark-python"],
... warehouse=test_warehouse,
... use_func_return_value=True,
... ) as dag:
... task1 = DAGTask(
... "task1",
... task_handler1,
... )
... task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
... task2 = DAGTask("task2", task_handler2)
... task1 >> task1_branch
... task1_branch >> [task2, task_handler3] # after >> you can use a DAGTask or a function.
... op = DAGOperation(schema)
... op.deploy(dag, mode=CreateMode.or_replace)
>>> finally:
... session.close()