snowflake.core.task.dagv1.DAG

class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | ModuleType] | None = None, use_func_return_value: bool = False)

Bases: object

A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.

Snowflake doesn’t have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root Task and its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer to Task graphs.

When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A task’s predecessor is the dummy task if it’s added to the task graph with no other predecessors.

Example

>>> dag = DAG("TEST_DAG",
...     schedule=timedelta(minutes=10),
...     use_func_return_value=True,
...     warehouse="TESTWH_DAG",
...     packages=["snowflake-snowpark-python"],
...     stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG"
... )
>>> def task1(session: Session) -> None:
...     session.sql("select 'task1'").collect()
>>> def task2(session: Session) -> None:
...     session.sql("select 'task2'").collect()
>>> def cond(session: Session) -> str:
...     return 'TASK1'
>>> with dag:
...     task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG")
...     task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG")
...     condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG")
...     condition >> [task1, task2]
>>> dag_op = DAGOperation(schema)
>>> dag_op.deploy(dag, mode="orReplace")
>>> dag_op.run(dag)
Note:
    When defining a task branch handler, simply return the task name you want to jump to. The task name is
    case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return
    'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match.
Copy

Refer to snowflake.core.task.Task for the details of each property.

Attributes

tasks

Returns a list of tasks this task graph has.

name

Name of the task graph and the dummy root task.

warehouse

Refer to snowflake.core.task.Task.warehouse.

user_task_managed_initial_warehouse_size

Refer to snowflake.core.task.Task.user_task_managed_initial_warehouse_size.

comment

comment of the task graph.

schedule

Schedule of the task graph. Refer to snowflake.core.task.Task.schedule.

error_integration

Refer to snowflake.core.task.Task.error_integration.

allow_overlapping_execution

Refer to snowflake.core.task.Task.allow_overlapping_execution.

user_task_timeout_ms

Refer to snowflake.core.task.Task.user_task_timeout_ms.

suspend_task_after_num_failures

Refer to snowflake.core.task.Task.suspend_task_after_num_failures.

config

Refer to snowflake.core.task.Task.config.

session_parameters

Refer to snowflake.core.task.Task.session_parameters.

stage_location

The default stage location where this task graph’s tasks code will be stored if creating the tasks from Python functions.

imports

The default imports for all tasks of this task graph if creating the tasks from Python functions.

packages

The default packages for the tasks of this task graph if creating the tasks from Python functions.

use_func_return_value

Use the Python function’s return value as Task return value if use_func_return_value is True. Default False.

Methods

__init__(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | ModuleType] | None = None, use_func_return_value: bool = False) None
add_task(task: DAGTask) None

Add a child task to this task graph.

get_task(task_name: str) DAGTask | None

Get a child task from this task graph based on task name.