Verwalten von Snowflake-Aufgaben und Task-Graphen mit Python¶
Sie können Python verwenden, um Snowflake-Aufgaben zu verwalten, mit denen Sie SQL-Anweisungen, Prozeduraufrufe und Logik in Snowflake Scripting ausführen können. Einen Überblick zu Aufgaben finden Sie unter Einführung in Aufgaben.
Die Snowflake-Python-API repräsentiert Aufgaben mit zwei verschiedenen Typen:
Task
: Zeigt die Eigenschaften einer Aufgabe an, z. B. den Zeitplan, die Parameter und die Vorgänger.TaskResource
: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendesTask
-Objekt abrufen, die Aufgabe ausführen und die Aufgabe ändern können.
Voraussetzungen¶
Die Beispiele unter diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root
-Objekt zu erstellen, von dem aus Sie die Snowflake-Python-API verwenden können.
Beispielsweise verwendet der folgende Code Verbindungsparameter, die in einer Konfigurationsdatei definiert sind, um eine Verbindung zu Snowflake zu erstellen:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Unter Verwendung des resultierenden Session
-Objekts erstellt der Code ein Root
-Objekt, das die Typen und Methoden der API verwendet. Weitere Informationen dazu finden Sie unter Verbindung zu Snowflake über Snowflake-Python-API herstellen.
Erstellen von Aufgaben (Tasks)¶
Um eine Aufgabe zu erstellen, erstellen Sie zunächst ein Task
-Objekt. Dann erstellen Sie unter Angabe der Datenbank und des Schemas, in dem die Aufgabe erstellt werden soll, ein TaskCollection
-Objekt. Mit TaskCollection.create
fügen Sie die neue Aufgabe zu Snowflake hinzu.
Der Code im folgenden Beispiel erstellt ein Task
-Objekt, das eine Aufgabe namens my_task
repräsentiert, die eine im Parameter definition
angegebene SQL-Abfrage ausführt:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Der Code erstellt eine TaskCollection
-Variable namens tasks
für die Datenbank my_db
und das Schema my_schema
. Mit TaskCollection.create
wird eine neue Aufgabe in Snowflake erstellt.
In diesem Codebeispiel wird auch ein timedelta
-Wert von einer Stunde für den Zeitplan der Aufgabe angegeben. Sie können den Zeitplan einer Aufgabe entweder mit einem timedelta
-Wert oder einem Cron
-Ausdruck definieren.
Sie können auch eine Aufgabe erstellen, die eine Python-Funktion oder eine gespeicherte Prozedur ausführt. Der Code im folgenden Beispiel erstellt eine Aufgabe mit dem Namen my_task2
, die eine Funktion ausführt, die durch ein StoredProcedureCall
-Objekt repräsentiert wird:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Dieses Objekt gibt eine Funktion namens dosomething
an, die sich am Speicherort des Stagingbereichs @mystage
befindet. Sie müssen auch einen warehouse
angeben, wenn Sie eine Aufgabe mit einem StoredProcedureCall
-Objekt erstellen.
Erstellen oder Aktualisieren von Aufgaben¶
Sie können die Merkmale einer vorhandenen Aufgabe aktualisieren, indem Sie die Eigenschaften eines Task
-Objekts festlegen, das die Aufgabe repräsentiert, und das Objekt dann mit der Methode TaskResource.create_or_update
an Snowflake übergeben.
Sie können auch ein Task
-Objekt übergeben, das eine neue Aufgabe beschreibt, wenn Sie die Aufgabe erstellen möchten.
Der Code im folgenden Beispiel legt den Namen, die Definition und den Zeitplan einer Aufgabe fest und aktualisiert dann die Aufgabe in Snowflake, bzw. erstellt die Aufgabe, falls sie noch nicht vorhanden ist:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Auflisten von Aufgaben¶
Sie können Aufgaben mit der Methode TaskCollection.iter
auflisten. Die Methode gibt einen PagedIter
-Iterator für Task
-Objekte zurück.
Der Code im folgenden Beispiel listet Aufgaben auf, deren Namen mit my beginnen:
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection
root = Root(connection)
tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
Ausführen von Aufgabenoperationen¶
Mit einem TaskResource
-Objekt können Sie allgemeine Aufgabenoperationen, wie das Ausführen, Anhalten und Fortsetzen von Aufgaben, ausführen.
Der Code im folgenden Beispiel führt die Aufgabe my_task
aus, hält sie an, setzt sie fort und löscht sie:
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Verwalten der Aufgaben eines Task-Graphen¶
Sie können Aufgaben (Tasks) verwalten, die in einem Task-Graphen zusammengeführt sind. Ein Task-Graph umfasst eine Reihe von Aufgaben mit genau einer Stammaufgabe und zusätzlichen Aufgaben, die nach ihren Abhängigkeiten organisiert sind.
Weitere Informationen zu Aufgaben in einem Task-Graphen finden Sie unter Task-Graphen.
Erstellen von Task-Graphen¶
Um einen Task-Graphen zu erstellen, erstellen Sie zunächst ein DAG
-Objekt, das seinen Namen und andere optionale Eigenschaften, wie z. B. den Zeitplan, angibt. Sie können den Zeitplan eines Task-Graphen entweder mit einem timedelta
-Wert oder einem Cron
-Ausdruck definieren.
Der Code im folgenden Beispiel definiert eine Python-Funktion dosomething
und gibt die Funktion dann als DAGTask
-Objekt namens dag_task2
im Task-Graphen an:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Dieser Code definiert auch eine SQL-Anweisung als ein weiteres DAGTask
-Objekt namens dag_task1
und gibt dann dag_task1
als Vorgänger von dag_task2
an. Schließlich wird der Task-Graph in Snowflake in der Datenbank my_db
und dem Schema my_schema
bereitgestellt.
Erstellen eines Task-Graphen mit Cron-Zeitplan, Aufgabenverzweigungen und Funktionsrückgabewerten¶
Sie können einen Task-Graphen auch mit einem bestimmten Cron-Zeitplan, mit Aufgabenverzweigungen und Funktionsrückgabewerten erstellen, die als Aufgabenrückgabewerte verwendet werden.
Der Code im folgenden Beispiel erstellt ein DAG
-Objekt mit einem Cron
-Objekt, das seinen Zeitplan angibt. Er definiert ein DAGTaskBranch
-Objekt namens task1_branch
zusammen mit anderen DAGTask
-Objekten und gibt deren gegenseitige Abhängigkeiten an:
from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
In diesem Codebeispiel werden auch Aufgaben-Handler-Funktionen definiert, und jedes DAGTask
- und DAGTaskBranch
-Objekt wird mit einem bestimmten Aufgaben-Handler erstellt, der der Aufgabe zugewiesen ist. Der Code setzt den Parameter use_func_return_value
des DAG auf True
, was bedeutet, dass der Rückgabewert der Python-Funktion als Rückgabewert der entsprechenden Aufgabe verwendet wird. Andernfalls hat use_func_return_value
den Standardwert False
.
Festlegen und Abrufen des Rückgabewerts einer Aufgabe in einem Task-Graphen¶
Wenn die Definition einer Aufgabe ein StoredProcedureCall
-Objekt ist, kann der Handler der gespeicherten Prozedur (oder Funktion) den Rückgabewert der Aufgabe explizit festlegen, indem ein TaskContext
-Objekt verwendet wird.
Weitere Informationen dazu finden Sie unter SYSTEM$SET_RETURN_VALUE.
Der Code im folgenden Beispiel definiert eine Aufgaben-Handler-Funktion, die ein Objekt TaskContext
mit dem Namen context
aus der aktuellen Sitzung erstellt. Dann wird die Methode TaskContext.set_return_value
verwendet, um den Rückgabewert explizit auf eine angegebene Zeichenfolge zu setzen:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
In einem Task-Graph kann eine unmittelbare Nachfolgeaufgabe, die die vorherige Aufgabe als ihren Vorgänger identifiziert, den von der Vorgängeraufgabe explizit festgelegten Rückgabewert abrufen.
Weitere Informationen dazu finden Sie unter SYSTEM$GET_PREDECESSOR_RETURN_VALUE.
Der Code im folgenden Beispiel definiert eine Aufgaben-Handler-Funktion, die die Methode TaskContext.get_predecessor_return_value
verwendet, um den Rückgabewert der Vorgängeraufgabe namens pred_task_name
abzurufen:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")