Tutorial 2: Create and manage tasks and task graphs (DAGs)¶
Introduction¶
In this tutorial, you create and use Snowflake tasks to manage some basic stored procedures. You also create a task graph — also called a directed acyclic graph (DAG) — to orchestrate tasks with a higher-level task graph API.
Prerequisites¶
Note
If you have already completed both Common setup for Snowflake Python APIs tutorials and Tutorial 1: Create a database, schema, table, and warehouse, you can skip these prerequisites and proceed to the first step of this tutorial.
Before you start this tutorial, you must complete the following steps:
Follow the common setup instructions, which includes the following steps:
Set up your development environment.
Install the Snowflake Python APIs package.
Configure your Snowflake connection.
Import all the modules required for the Python API tutorials.
Create an API
Root
object.
Run the following code to create a database named
PYTHON_API_DB
and a schema namedPYTHON_API_SCHEMA
in that database.database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
These are the same database and schema objects you create in Tutorial 1.
After completing these prerequisites, you are ready to start using the API for task management.
Set up Snowflake objects¶
Set up the stored procedures that your tasks will invoke and the stage that will hold the stored procedures. You can use your
Snowflake Python APIs root
object to create a stage in the PYTHON_API_DB
database and PYTHON_API_SCHEMA
schema you previously
created.
To create a stage named
TASKS_STAGE
, in the next cell of your notebook, run the following code:stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
This stage will hold the stored procedures and any dependencies those procedures need.
To create two basic Python functions that the tasks will run as stored procedures, in your next cell, run the following code:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
These functions do the following:
trunc()
: Creates a truncated version of an input table.filter_by_shipmode()
: Filters theSNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
table by ship mode, limits the results to 10 rows, and writes the results in a new table.Note
This function queries the TPC-H sample data in the SNOWFLAKE_SAMPLE_DATA database. Snowflake creates the sample database in new accounts by default. If the database has not been created in your account, see Using the Sample Database.
The functions are intentionally basic and are intended for demonstration purposes.
Create and manage tasks¶
Define, create, and manage two tasks that will run your previously created Python functions as stored procedures.
To define the two tasks,
task1
andtask2
, in the next cell of your notebook, run the following code:tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
In this code, you specify the following task parameters:
For each task, a definition represented by a StoredProcedureCall object that includes the following attributes:
The callable function to run
The stage location where the contents of your Python function and its dependencies are uploaded
The stored procedure’s package dependencies
A warehouse to run the stored procedure (required when creating a task with a
StoredProcedureCall
object). This tutorial uses theCOMPUTE_WH
warehouse that is included with your trial account.A run schedule for the root task,
task1
. The schedule specifies the interval at which to run the task periodically.
For more information about stored procedures, see Writing stored procedures in Python.
To create the two tasks, retrieve a
TaskCollection
object (tasks
) from your database schema and call.create()
on your task collection:# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
In this code example, you also link the tasks by setting
task1
as a predecessor totask2
, which creates a minimal task graph.To confirm that the two tasks now exist, in your next cell, run the following code:
taskiter = tasks.iter() for t in taskiter: print(t.name)
When you create tasks, they are suspended by default.
To start a task, call
.resume()
on the task resource object:trunc_task.resume()
To confirm that the
trunc_task
task was started, in your next cell, run the following code:taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
The output should be similar to this:
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
You can repeat this step whenever you want to confirm the status of the tasks.
To clean up your task resources, you first suspend the task before dropping it.
In your next cell, run the following code:
trunc_task.suspend()
To confirm that the task is suspended, repeat step 5.
Optional: To drop both tasks, in your next cell, run the following code:
trunc_task.drop() filter_task.drop()
Create and manage a task graph¶
When you’re coordinating the execution of a large number of tasks, individually managing each task can be a challenge. The Snowflake Python APIs provides functionality to orchestrate tasks with a higher-level task graph API.
A task graph, which is also called a directed acyclic graph (DAG), is a series of tasks composed of a root task and child tasks, organized by their dependencies. For more information, see Manage task dependencies with task graphs.
To create and deploy a task graph, run the following code:
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
In this code, you do the following:
Create a task graph object by calling the
DAG
constructor and specifying a name and schedule.Define task graph–specific tasks using the
DAGTask
constructor. Note that the constructor accepts the same arguments that you specified for theStoredProcedureCall
class in a previous step.Specify
dag_task1
as the root task and predecessor todag_task2
with more convenient syntax.Deploy the task graph to the
PYTHON_API_SCHEMA
schema of thePYTHON_API_DB
database.
To confirm the creation of the task graph, in your next cell, run the following code:
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
You can repeat this step whenever you want to confirm the status of the tasks.
To start the task graph by starting the root task, in your next cell, run the following code:
dag_op.run(dag)
To confirm that the
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
task started, repeat step 2.Note
The function call invoked by the task graph will not succeed because you are not calling it with any of its required arguments. The purpose of this step is only to demonstrate how to programmatically start the task graph.
To drop the task graph, in your next cell, run the following code:
dag_op.drop(dag)
Clean up the database object that you created in these tutorials:
database.drop()
What’s next?¶
Congratulations! In this tutorial, you learned how to create and manage tasks and task graphs using the Snowflake Python APIs.
Summary¶
Along the way, you completed the following steps:
Create a stage that can hold stored procedures and their dependencies.
Create and manage tasks.
Create and manage a task graph.
Clean up your Snowflake resource objects by dropping them.
Next tutorial¶
You can now proceed to Tutorial 3: Create and manage Snowpark Container Services, which shows how to create and manage components in Snowpark Container Services.
Additional resources¶
For more examples of using the API to manage other types of objects in Snowflake, see the following developer guides:
Guide |
Description |
---|---|
Managing Snowflake databases, schemas, tables, and views with Python |
Use the API to create and manage databases, schemas, and tables. |
Use the API to create and manage users, roles, and grants. |
|
Use the API to create and manage data loading and unloading resources, including external volumes, pipes, and stages. |
|
Managing Snowpark Container Services (including service functions) with Python |
Use the API to manage components of Snowpark Container Services, including compute pools, image repositories, services, and service functions. |