Verwalten von Snowflake-Aufgaben und DAGs mit Python

Sie können Python verwenden, um Snowflake-Aufgaben zu verwalten, mit denen Sie SQL-Anweisungen, Prozeduraufrufe und Logik in Snowflake Scripting ausführen können. Einen Überblick zu Aufgaben finden Sie unter Einführung in Aufgaben.

Die Snowflake-Python-API repräsentiert Aufgaben mit zwei verschiedenen Typen:

  • Task: Zeigt die Eigenschaften einer Aufgabe an, z. B. den Zeitplan, die Parameter und die Vorgänger.

  • TaskResource: Stellt Methoden zur Verfügung, mit denen Sie ein entsprechendes Task-Objekt abrufen, die Aufgabe ausführen und die Aufgabe ändern können.

Die Beispiele unter diesem Thema gehen davon aus, dass Sie Code hinzugefügt haben, um eine Verbindung zu Snowflake herzustellen und ein Root-Objekt zu erstellen, von dem aus Sie die Snowflake-Python-API verwenden können. Weitere Informationen dazu finden Sie unter Verbinden mit Snowflake über die Snowflake-Python-API.

Der Code im folgenden Beispiel verwendet die in einer Konfigurationsdatei definierten Verbindungsparameter, um eine Verbindung zu Snowflake herzustellen. Unter Verwendung des resultierenden Session-Objekts erstellt der Code ein Root-Objekt, das die Typen und Methoden der API verwendet.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

Aufgabe erstellen

Um eine Aufgabe zu erstellen, erstellen Sie zunächst ein Task-Objekt. Dann erstellen Sie unter Angabe der Datenbank und des Schemas, in dem die Aufgabe erstellt werden soll, ein TaskCollection-Objekt. Mit TaskCollection.create fügen Sie die neue Aufgabe zu Snowflake hinzu.

Der Code im folgenden Beispiel erstellt ein Task-Objekt, das eine Aufgabe namens my_task repräsentiert, die eine im Parameter definition angegebene SQL-Abfrage ausführt. Der Code erstellt eine TaskCollection-Variable namens tasks für die Datenbank my_db und das Schema my_schema. Mit TaskCollection.create wird eine neue Aufgabe in Snowflake erstellt.

In diesem Codebeispiel wird auch ein timedelta-Wert von einer Stunde für den Zeitplan der Aufgabe angegeben. Sie können den Zeitplan einer Aufgabe entweder mit einem timedelta-Wert oder einem Cron-Ausdruck definieren.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

Sie können auch eine Aufgabe erstellen, die eine Python-Funktion oder eine gespeicherte Prozedur ausführt. Der Code im folgenden Beispiel erstellt eine Aufgabe mit dem Namen my_task2, die eine Funktion ausführt, die durch ein StoredProcedureCall-Objekt repräsentiert wird. Dieses Objekt gibt eine Funktion namens dosomething an, die sich am Speicherort des Stagingbereichs @mystage befindet. Sie müssen auch einen warehouse angeben, wenn Sie eine Aufgabe mit einem StoredProcedureCall-Objekt erstellen.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

Aufgabe erstellen oder aktualisieren

Sie können die Merkmale einer vorhandenen Aufgabe aktualisieren, indem Sie die Eigenschaften eines Task-Objekts festlegen, das die Aufgabe repräsentiert, und das Objekt dann mit der Methode TaskResource.create_or_update an Snowflake übergeben.

Sie können auch ein Task-Objekt übergeben, das eine neue Aufgabe beschreibt, wenn Sie die Aufgabe erstellen möchten.

Der Code im folgenden Beispiel legt den Namen, die Definition und den Zeitplan einer Aufgabe fest und aktualisiert dann die Aufgabe in Snowflake, wobei die Aufgabe erstellt wird, falls sie noch nicht vorhanden ist.

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

Aufgaben auflisten

Sie können Aufgaben mit der Methode TaskCollection.iter auflisten. Die Methode gibt einen PagedIter-Iterator für Task-Objekte zurück.

Der Code im folgenden Beispiel listet Aufgaben auf, deren Name den Text „my“ enthält, und gibt den Namen jeder einzelnen aus.

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

Aufgabenoperationen ausführen

Mit einem TaskResource-Objekt können Sie allgemeine Aufgabenoperationen, wie das Ausführen, Anhalten und Fortsetzen von Aufgaben, ausführen.

Der Code im folgenden Beispiel führt die Aufgabe my_task aus, hält sie an, setzt sie fort und löscht sie.

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

Verwalten der Aufgaben in einem DAG

Sie können Aufgaben verwalten, die in einem Directed Acyclic Graph (DAG) zusammengeführt sind. Ein DAG umfasst eine Reihe von Aufgaben mit genau einer Stammaufgabe und zusätzlichen Aufgaben, die nach ihren Abhängigkeiten organisiert sind.

Weitere Informationen zu Aufgaben in einem DAG finden Sie unter DAG von Aufgaben.

DAG von Aufgaben erstellen

Um eine DAG von Aufgaben zu erstellen, erstellen Sie zunächst ein DAG-Objekt, das seinen Namen und andere optionale Eigenschaften, wie z. B. den Zeitplan, angibt. Sie können den Zeitplan eines DAG entweder mit einem timedelta-Wert oder einem Cron-Ausdruck definieren.

Der Code im folgenden Beispiel definiert eine Python-Funktion dosomething und gibt die Funktion dann als DAGTask-Objekt mit dem Namen dag_task2 im DAG an. Er definiert auch eine SQL-Anweisung als ein weiteres DAGTask-Objekt mit dem Namen dag_task1 und gibt dann dag_task1 als Vorgänger von dag_task2 an. Schließlich wird der DAG in Snowflake in der Datenbank my_db und dem Schema my_schema bereitgestellt.

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

DAG mit Cron-Zeitplan, Aufgabenverzweigungen und Funktionsrückgabewerten erstellen

Sie können einen DAG auch mit einem bestimmten Cron-Zeitplan, Aufgabenverzweigungen und Funktionsrückgabewerten erstellen, die als Aufgabenrückgabewerte verwendet werden.

Der Code im folgenden Beispiel erstellt ein DAG-Objekt mit einem Cron-Objekt, das seinen Zeitplan angibt. Er definiert ein DAGTaskBranch-Objekt mit dem Namen task1_branch zusammen mit anderen DAGTask-Objekten und gibt deren Abhängigkeiten zueinander an.

In diesem Codebeispiel werden auch Aufgaben-Handler-Funktionen definiert, und jedes DAGTask- und DAGTaskBranch-Objekt wird mit einem bestimmten Aufgaben-Handler erstellt, der der Aufgabe zugewiesen ist. Der Code setzt den Parameter use_func_return_value des DAG auf True, was bedeutet, dass der Rückgabewert der Python-Funktion als Rückgabewert der entsprechenden Aufgabe verwendet wird. Andernfalls hat use_func_return_value den Standardwert False.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

Rückgabewert einer Aufgabe in einem DAG festlegen und abrufen

Wenn die Definition einer Aufgabe ein StoredProcedureCall-Objekt ist, kann der Handler der gespeicherten Prozedur (oder Funktion) den Rückgabewert der Aufgabe explizit festlegen, indem ein TaskContext-Objekt verwendet wird.

Weitere Informationen dazu finden Sie unter SYSTEM$SET_RETURN_VALUE.

Der Code im folgenden Beispiel definiert eine Aufgaben-Handler-Funktion, die ein Objekt TaskContext mit dem Namen context aus der aktuellen Sitzung erstellt. Dann wird die Methode TaskContext.set_return_value verwendet, um den Rückgabewert explizit auf eine angegebene Zeichenfolge zu setzen.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

In einer DAG von Aufgaben kann eine unmittelbare Nachfolgeaufgabe, die die vorherige Aufgabe als ihren Vorgänger identifiziert, den von der Vorgängeraufgabe explizit festgelegten Rückgabewert abrufen.

Weitere Informationen dazu finden Sie unter SYSTEM$GET_PREDECESSOR_RETURN_VALUE.

Der Code im folgenden Beispiel definiert eine Aufgaben-Handler-Funktion, die die Methode TaskContext.get_predecessor_return_value verwendet, um den Rückgabewert der Vorgängeraufgabe mit dem Namen pred_task_name zu erhalten.

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy