snowflake.core.task.dagv1ΒΆ

High level, client-side representation of task graphs.

This set of higher-level classes provides a more convenient way to create, deploy, and manage task graphs than the lower-level Task APIs in snowflake.core.task. Task graphs are directed acyclic graphs (DAG) of tasks.

Example 1: Create a task graph that has two Tasks.
>>> from snowflake.snowpark.functions import sum as sum_
>>> from snowflake.core.task import StoredProcedureCall
>>> from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
>>> def dosomething(session: Session) -> None:
...     df = session.table("target")
...     df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
>>> with DAG("my_dag", schedule=timedelta(days=1)) as dag:
...     # Create a task that runs some SQL.
...     dag_task1 = DAGTask(
...         "dagtask1",
...         "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v")
...     # Create a task that runs a Python function.
...     dag_task2 = DAGTask(
...         StoredProcedureCall(
...             dosomething, stage_location="@mystage",
...             packages=["snowflake-snowpark-python"]
...         ),
...         warehouse="test_warehouse")
...     )
>>> # Shift right and left operators can specify task relationships.
>>> dag_task1 >> dag_task2
>>> schema = root.databases["MYDB"].schemas["MYSCHEMA"]
>>> dag_op = DAGOperation(schema)
>>> dag_op.deploy(dag)
Copy
Example 2: Create a task graph that uses Cron, Branch, and function return value as Task return value
>>> from snowflake.snowpark import Session
>>> from snowflake.core import Root
>>> from snowflake.core._common import CreateMode
>>> from snowflake.core.task import Cron
>>> from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
>>> session = Session.builder.create()
>>> test_stage = "mystage"
>>> test_dag = "mydag"
>>> test_db = "mydb"
>>> test_schema = "public"
>>> test_warehouse = "testwh_python"
>>> root = Root(session)
>>> schema = root.databases[test_db].schemas[test_schema]
>>> def task_handler1(session: Session) -> None:
...     pass  # do something
>>> def task_handler2(session: Session) -> None:
...     pass  # do something
>>> def task_handler3(session: Session) -> None:
...     pass  # do something
>>> def task_branch_handler(session: Session) -> str:
...     # do something
...     return "task3"
>>> try:
...     with DAG(
...         test_dag,
...         schedule=Cron("10 * * * *", "America/Los_Angeles"),
...         stage_location=test_stage,
...         packages=["snowflake-snowpark-python"],
...         warehouse=test_warehouse,
...         use_func_return_value=True,
...     ) as dag:
...         task1 = DAGTask(
...             "task1",
...             task_handler1,
...         )
...         task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
...         task2 = DAGTask("task2", task_handler2)
...         task1 >> task1_branch
...         task1_branch >> [task2, task_handler3]  # after >> you can use a DAGTask or a function.
...     op = DAGOperation(schema)
...     op.deploy(dag, mode=CreateMode.or_replace)
>>> finally:
...     session.close()
Copy

Classes

DAG(name, *[, schedule, warehouse, ...])

A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.

DAGTask(name, definition, *[, condition, ...])

Represents a child Task of a task graph.

DAGRun()

Contains the history of a task graph run in Snowflake.

DAGOperation(schema)

APIs to manage task graph child task operations.