snowflake.core.task.dagv1.DAG¶
- class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | module] | None = None, use_func_return_value: bool = False)¶
Bases:
object
A graph of tasks composed of a single root task and additional tasks, organized by their dependencies.
Snowflake doesn’t have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root
Task
and its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer to Task graphs.When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A task’s predecessor is the dummy task if it’s added to the task graph with no other predecessors.
Example
>>> dag = DAG("TEST_DAG", ... schedule=timedelta(minutes=10), ... use_func_return_value=True, ... warehouse="TESTWH_DAG", ... packages=["snowflake-snowpark-python"], ... stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG" ... ) >>> def task1(session: Session) -> None: ... session.sql("select 'task1'").collect() >>> def task2(session: Session) -> None: ... session.sql("select 'task2'").collect() >>> def cond(session: Session) -> str: ... return 'TASK1' >>> with dag: ... task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG") ... task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG") ... condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG") ... condition >> [task1, task2] >>> dag_op = DAGOperation(schema) >>> dag_op.deploy(dag, mode="orReplace") >>> dag_op.run(dag) Note: When defining a task branch handler, simply return the task name you want to jump to. The task name is case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return 'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match.
Refer to
snowflake.core.task.Task
for the details of each property.Attributes
- tasks¶
Returns a list of tasks this task graph has.
- name¶
Name of the task graph and the dummy root task.
- warehouse¶
Refer to
snowflake.core.task.Task.warehouse
.
- user_task_managed_initial_warehouse_size¶
Refer to
snowflake.core.task.Task.user_task_managed_initial_warehouse_size
.
- comment¶
comment of the task graph.
- schedule¶
Schedule of the task graph. Refer to
snowflake.core.task.Task.schedule
.
- error_integration¶
- allow_overlapping_execution¶
Refer to
snowflake.core.task.Task.allow_overlapping_execution
.
- user_task_timeout_ms¶
- suspend_task_after_num_failures¶
Refer to
snowflake.core.task.Task.suspend_task_after_num_failures
.
- config¶
Refer to
snowflake.core.task.Task.config
.
- session_parameters¶
- stage_location¶
The default stage location where this task graph’s tasks code will be stored if creating the tasks from Python functions.
- imports¶
The default imports for all tasks of this task graph if creating the tasks from Python functions.
- packages¶
The default packages for the tasks of this task graph if creating the tasks from Python functions.
- use_func_return_value¶
Use the Python function’s return value as Task return value if
use_func_return_value
is True. Default False.
Methods
- __init__(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: Dict[str, Any] | None = None, session_parameters: Dict[str, Any] | None = None, stage_location: str | None = None, imports: List[str | Tuple[str, str]] | None = None, packages: List[str | module] | None = None, use_func_return_value: bool = False) None ¶