Tutorial 2: Aufgaben und Diagramme (DAGs) erstellen und verwalten

Einführung

In diesem Tutorial erstellen und verwenden Sie Snowflake-Aufgaben, um einige grundlegende gespeicherte Prozeduren zu verwalten. Sie erstellen auch einen Task-Graph, auch Directed Acyclic Graph genannt (DAG), um Aufgaben (Tasks) mit einer übergeordneten Task-Graph-API zu orchestrieren.

Voraussetzungen

Bemerkung

Wenn Sie sowohl Gemeinsame Einrichtung für Snowflake Python APIs-Tutorials als auch Tutorial 1: Eine Datenbank, ein Schema, eine Tabelle und ein Warehouse erstellen bereits abgeschlossen haben, können Sie diese Voraussetzungen überspringen und mit dem ersten Schritt dieses Tutorials fortfahren.

Bevor Sie mit diesem Tutorial beginnen, müssen Sie die folgenden Schritte ausführen:

  1. Befolgen Sie die Anweisungen von common setup, die die folgenden Schritte umfassen:

    • Richten Sie Ihre Entwicklungsumgebung ein.

    • Installieren Sie das Snowflake Python APIs-Paket.

    • Konfigurieren Sie Ihre Snowflake-Verbindung.

    • Importieren Sie alle Module, die für die Python API-Tutorials erforderlich sind.

    • Erstellen Sie ein API Root-Objekt.

  2. Führen Sie den folgenden Code aus, um eine Datenbank namens PYTHON_API_DB und ein Schema namens PYTHON_API_SCHEMA in dieser Datenbank zu erstellen.

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    Dies sind dieselben Datenbank- und Schema-Objekte, die Sie in Tutorial 1 erstellen.

Sobald Sie diese Voraussetzungen erfüllen, können Sie API für die Verwaltung von Aufgaben verwenden.

Einrichten von Snowflake-Objekten

Richten Sie die gespeicherten Prozeduren ein, die Ihre Aufgaben aufrufen werden, sowie den Stagingbereich, in dem die gespeicherten Prozeduren gespeichert werden sollen. Sie können Ihr Snowflake Python APIs root-Objekt verwenden, um einen Stagingbereich in der PYTHON_API_DB-Datenbank und dem PYTHON_API_SCHEMA-Schema zu erstellen, die Sie zuvor erstellt haben.

  1. Um einen Stagingbereich namens TASKS_STAGE zu erstellen, führen Sie in der nächsten Zelle Ihres Notebooks den folgenden Code aus:

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    Dieser Stagingbereich enthält die gespeicherten Prozeduren und alle Abhängigkeiten, die diese Prozeduren benötigen.

  2. Um zwei grundlegende Funktionen in Python zu erstellen, die die Aufgaben als gespeicherte Prozeduren ausführen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    Diese Funktionen haben die folgenden Aufgaben:

    • trunc(): Erzeugt eine verkürzte Version einer Eingabetabelle.

    • filter_by_shipmode(): Filtert die SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM-Tabelle nach dem Liefermodus, schränkt die Ergebnisse auf 10 Zeilen ein und schreibt die Ergebnisse in eine neue Tabelle.

      Bemerkung

      Diese Funktion fragt die TPC-H Beispieldaten in der SNOWFLAKE_SAMPLE_DATA-Datenbank ab. Snowflake erstellt die Beispieldatenbank standardmäßig in neuen Konten. Wenn die Datenbank nicht in Ihrem Konto erstellt wurde, finden Sie weitere Informationen unter Verwenden der Beispieldatenbank.

    Die Funktionen sind absichtlich einfach gehalten und dienen zu Demonstrationszwecken.

Aufgaben erstellen und verwalten

Definieren, erstellen und verwalten Sie zwei Aufgaben, die Ihre zuvor erstellten Python-Funktionen als gespeicherte Prozeduren ausführen.

  1. Um die beiden Aufgaben task1 und task2 in der nächsten Zelle Ihres Notebooks zu definieren, führen Sie den folgenden Code aus:

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    In diesem Code geben Sie die folgenden Aufgabenparameter an:

    • Für jede Aufgabe exisitiert eine Definition, die durch ein StoredProcedureCall-Objekt dargestellt wird, das die folgenden Attribute enthält:

      • Die aufrufbare Funktion zur Ausführung

      • Den Stagingbereich, in den der Inhalt Ihrer Python-Funktion und ihrer Abhängigkeiten hochgeladen wird

      • Die Paketabhängigkeiten der gespeicherten Prozedur

    • Ein Warehouse zur Ausführung der gespeicherten Prozedur (erforderlich bei der Erstellung einer Aufgabe mit einem StoredProcedureCall-Objekt). In diesem Tutorial wird das Warehouse COMPUTE_WH verwendet, das für Ihr Testkonto zur Verfügung gestellt wird.

    • Einen Zeitplan für die Stammaufgabe task1. Der Zeitplan gibt das Intervall an, in dem die Aufgabe periodisch ausgeführt werden soll.

    Weitere Informationen zu gespeicherten Prozeduren finden Sie unter Schreiben von gespeicherten Prozeduren in Python.

  2. Um die beiden Aufgaben zu erstellen, rufen Sie ein TaskCollection-Objekt (tasks) aus Ihrem Datenbankschema ab und rufen .create() in Ihrer Aufgabensammlung auf:

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    In diesem Codebeispiel verknüpfen Sie die Aufgaben auch, indem Sie task1 als Vorgänger von task2 einstellen, wodurch ein minimaler Task-Graph erstellt wird.

  3. Um zu bestätigen, dass die beiden Aufgaben nun existieren, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. Wenn Sie Aufgaben erstellen, werden diese standardmäßig ausgesetzt.

    Um eine Aufgabe zu starten, rufen Sie .resume() für das Aufgabenressourcenobjekt auf:

    trunc_task.resume()
    
    Copy
  5. Um zu bestätigen, dass die Aufgabe trunc_task gestartet wurde, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    Die Ausgabe sollte in etwa so aussehen:

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    Sie können diesen Schritt jederzeit wiederholen, wenn Sie den Status der Aufgaben bestätigen möchten.

  6. Um die Aufgabenressourcen zu bereinigen, setzen Sie die Aufgabe zunächst aus, bevor Sie sie löschen.

    Führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    trunc_task.suspend()
    
    Copy
  7. Um zu bestätigen, dass die Aufgabe ausgesetzt ist, wiederholen Sie Schritt 5.

  8. Optional: Um beide Aufgaben zu löschen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    trunc_task.drop()
    filter_task.drop()
    
    Copy

Erstellen und Verwalten eines Task-Graphs

Wenn Sie die Ausführung einer großen Anzahl von Aufgaben koordinieren, kann die individuelle Verwaltung jeder einzelnen Aufgabe eine Herausforderung sein. Snowflake Python APIs bietet Funktionen zur Orchestrierung von Aufgaben mit einer übergeordneten Task-Graph-API.

Ein Task-Graph, auch als Directed Acyclic Graph (DAG) genannt, besteht aus einer Reihe von Aufgaben, die aus einer Stammaufgabe (Root Task) und untergeordneten Aufgaben (Child Tasks) besteht, die nach ihren Abhängigkeiten organisiert sind. Weitere Informationen dazu finden Sie unter Aufgabenabhängigkeiten mit Task-Graphen verwalten.

  1. Um ein Task-Graphs zu erstellen und bereitzustellen, führen Sie den folgenden Code aus:

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    In diesem Code gehen Sie wie folgt vor:

    • Erstellen Sie ein Task-Graph-Objekt, indem Sie den DAG-Konstruktor aufrufen und einen Namen und einen Zeitplan angeben.

    • Definieren Sie einen Task-Graph, d. h. spezifische Aufgaben, und verwenden dazu den DAGTask-Konstruktor. Beachten Sie, dass der Konstruktor dieselben Arguments akzeptiert, die Sie in einem früheren Schritt für die StoredProcedureCall-Klasse angegeben haben.

    • Geben Sie dag_task1 als Stammaufgabe und Vorgänger von dag_task2 mit einer praktischeren Syntax an.

    • Stellen Sie den Task-Graph im PYTHON_API_SCHEMA-Schema der PYTHON_API_DB-Datenbank bereit.

  2. Um die Erstellung des Task-Graphs zu bestätigen, führen Sie in Ihrer nächsten Zelle den folgenden Code aus:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    Sie können diesen Schritt jederzeit wiederholen, wenn Sie den Status der Aufgaben bestätigen möchten.

  3. Um den Task-Graph durch den Start der Stammaufgabe zu starten, führen Sie in der nächsten Zelle den folgenden Code aus:

    dag_op.run(dag)
    
    Copy
  4. Um zu bestätigen, dass die Aufgabe PYTHON_API_DAG$TASK_PYTHON_API_TRUNC gestartet wurde, wiederholen Sie Schritt 2.

    Bemerkung

    Der Funktionsaufruf, der durch den Task-Graph erfolgt, wird nicht erfolgreich sein, wenn Sie ihn nicht mit den erforderlichen Argumente aufrufen. Dieser Schritt dient nur dazu, Ihnen zu zeigen, wie Sie den Task-Graph programmgesteuert starten können.

  5. Um das Task-Graph zu löschen, führen Sie in der nächsten Zelle den folgenden Code aus:

    dag_op.drop(dag)
    
    Copy
  6. Bereinigen Sie das Datenbankobjekt, das Sie in diesen Tutorials erstellt haben:

    database.drop()
    
    Copy

Nächste Schritte

Herzlichen Glückwunsch! In diesem Tutorial haben Sie gelernt, wie Sie Aufgaben und Task-Graphs mit Snowflake Python APIs erstellen und verwalten.

Zusammenfassung

Dabei haben Sie die folgenden Schritte durchgeführt:

  • Erstellen Sie einen Stagingbereich, der gespeicherte Prozeduren und ihre Abhängigkeiten aufnehmen kann.

  • Aufgaben erstellen und verwalten.

  • Erstellen und verwalten Sie einen Task-Graph.

  • Bereinigen Sie Ihre Snowflake-Ressourcenobjekte, indem Sie sie löschen.

Nächstes Tutorial

Sie können nun mit Tutorial 3: Snowpark Container Services erstellen und verwalten fortfahren, wo Sie erfahren, wie Sie Komponenten in Snowpark Container Services erstellen und verwalten können.

Zusätzliche Ressourcen

Weitere Beispiele für die Verwendung von API zur Verwaltung anderer Arten von Objekten in Snowflake finden Sie in den folgenden Entwicklerhandbüchern:

Benutzerhandbuch

Beschreibung

Verwalten von Snowflake-Datenbanken, Schemas, Tabellen und Ansichten mit Python

Verwenden der API, um Datenbanken, Schemas und Tabellen zu erstellen und zu verwalten.

Verwalten von Snowflake-Benutzern, Rollen und Berechtigungen mit Python

Verwenden der API, um Benutzer, Rollen und Berechtigungen zu erstellen und zu verwalten.

Verwalten des Ladens von Daten und des Entladens von Ressourcen mit Python

Verwenden der API, um Ressourcen zum Laden und Entladen von Daten zu erstellen und zu verwalten, einschließlich externer Volumes, Pipes und Stagingbereiche.

Verwalten von Snowpark Container Services (einschließlich Dienstfunktionen) mit Python

Verwenden der API, um die Komponenten der Snowpark Container Services zu verwalten, einschließlich Computepools, Image-Repositorys, Diensten und Dienstfunktionen.