snowflake.core.task.dagv1¶
High level, client-side representation of task graphs.
This set of higher-level classes provides a more convenient way to create, deploy, and manage task graphs than the lower-level Task APIs in snowflake.core.task. Task graphs are directed acyclic graphs (DAG) of tasks.
- Example 1: Create a task graph that has two Tasks.
>>> from snowflake.snowpark.functions import sum as sum_ >>> from snowflake.core.task import StoredProcedureCall >>> from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation >>> def dosomething(session: Session) -> None: ... df = session.table("target") ... df.group_by("a").agg(sum_("b")).save_as_table("agg_table") >>> with DAG("my_dag", schedule=timedelta(days=1)) as dag: ... # Create a task that runs some SQL. ... dag_task1 = DAGTask( ... "dagtask1", ... "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v") ... # Create a task that runs a Python function. ... dag_task2 = DAGTask( ... StoredProcedureCall( ... dosomething, stage_location="@mystage", ... packages=["snowflake-snowpark-python"] ... ), ... warehouse="test_warehouse") ... ) >>> # Shift right and left operators can specify task relationships. >>> dag_task1 >> dag_task2 >>> schema = root.databases["MYDB"].schemas["MYSCHEMA"] >>> dag_op = DAGOperation(schema) >>> dag_op.deploy(dag)
- Example 2: Create a task graph that uses Cron, Branch, and function return value as Task return value
>>> from snowflake.snowpark import Session >>> from snowflake.core import Root >>> from snowflake.core._common import CreateMode >>> from snowflake.core.task import Cron >>> from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch >>> session = Session.builder.create() >>> test_stage = "mystage" >>> test_dag = "mydag" >>> test_db = "mydb" >>> test_schema = "public" >>> test_warehouse = "testwh_python" >>> root = Root(session) >>> schema = root.databases[test_db].schemas[test_schema] >>> def task_handler1(session: Session) -> None: ... pass # do something >>> def task_handler2(session: Session) -> None: ... pass # do something >>> def task_handler3(session: Session) -> None: ... pass # do something >>> def task_branch_handler(session: Session) -> str: ... # do something ... return "task3" >>> try: ... with DAG( ... test_dag, ... schedule=Cron("10 * * * *", "America/Los_Angeles"), ... stage_location=test_stage, ... packages=["snowflake-snowpark-python"], ... warehouse=test_warehouse, ... use_func_return_value=True, ... ) as dag: ... task1 = DAGTask( ... "task1", ... task_handler1, ... ) ... task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse) ... task2 = DAGTask("task2", task_handler2) ... task1 >> task1_branch ... task1_branch >> [task2, task_handler3] # after >> you can use a DAGTask or a function. ... op = DAGOperation(schema) ... op.deploy(dag, mode=CreateMode.or_replace) >>> finally: ... session.close()
Classes
|
A graph of tasks composed of a single root task and additional tasks, organized by their dependencies. |
|
Represents a child Task of a task graph. |
|
Contains the history of a task graph run in Snowflake. |
|
APIs to manage task graph child task operations. |