snowflake.core.task.dagv1.DAGΒΆ
- class snowflake.core.task.dagv1.DAG(name: str, *, schedule: Cron | timedelta | None = None, warehouse: str | None = None, user_task_managed_initial_warehouse_size: str | None = None, error_integration: str | None = None, comment: str | None = None, task_auto_retry_attempts: int | None = None, allow_overlapping_execution: bool | None = None, user_task_timeout_ms: int | None = None, suspend_task_after_num_failures: int | None = None, config: dict[str, Any] | None = None, session_parameters: dict[str, Any] | None = None, stage_location: str | None = None, imports: list[str | tuple[str, str]] | None = None, packages: list[str | ModuleType] | None = None, use_func_return_value: bool = False)ΒΆ
- Bases: - object- A graph of tasks composed of a single root task and additional tasks, organized by their dependencies. - Snowflake doesnβt have a first-class task graph entity, so this is a client-side object representation which manages Task relationship. A root - Taskand its successors logically form a task graph or DAG (Directed Acyclic Graph). Refer to Task graphs.- When a task graph is deployed, all child tasks are created in Snowflake. A dummy Task is created as the root. A taskβs predecessor is the dummy task if itβs added to the task graph with no other predecessors. - Example - >>> dag = DAG( ... "TEST_DAG", ... schedule=timedelta(minutes=10), ... use_func_return_value=True, ... warehouse="TESTWH_DAG", ... packages=["snowflake-snowpark-python"], ... stage_location="@TESTDB_DAG.TESTSCHEMA_DAG.TEST_STAGE_DAG", ... ) >>> def task1(session: Session) -> None: ... session.sql("select 'task1'").collect() >>> def task2(session: Session) -> None: ... session.sql("select 'task2'").collect() >>> def cond(session: Session) -> str: ... return "TASK1" >>> with dag: ... task1 = DAGTask("TASK1", definition=task1, warehouse="TESTWH_DAG") ... task2 = DAGTask("TASK2", definition=task2, warehouse="TESTWH_DAG") ... condition = DAGTaskBranch("COND", definition=cond, warehouse="TESTWH_DAG") ... condition >> [task1, task2] >>> dag_op = DAGOperation(schema) >>> dag_op.deploy(dag, mode="orReplace") >>> dag_op.run(dag) Note: When defining a task branch handler, simply return the task name you want to jump to. The task name is case-sensitive, and it has to match the name property in DAGTask. For exmaple, in above sample code, return 'TASK1' instead of 'TEST_DAG$TASK1', 'task1' or 'Task1' will not be considered as a exact match. - Refer to - snowflake.core.task.Taskfor the details of each property.- Attributes - tasksΒΆ
- Returns a list of tasks this task graph has. 
 - Methods - add_task(task: DAGTask) NoneΒΆ
- Add a child task to this task graph. - Parameters:
- task (DAGTask) β The child task to be added to this task graph. 
 - Examples - Add a task to previously created DAG - >>> child_task = DagTask("child_task", "select 'child_task'", warehouse="test_warehouse") >>> dag.add_task(child_task) )