Erstellen Sie eine Sequenz von Aufgaben mit einem Task-Graphen

In Snowflake können Sie mehrere Aufgaben mit einem Task-Graphen verwalten, der auch als gerichteter azyklischer Graph oder DAG bezeichnet wird. Ein Task-Graph besteht aus einer Stammaufgabe und abhängigen untergeordnete Aufgabe. Die Abhängigkeiten müssen in einer Richtung von Anfang bis Ende laufen, ohne Schleifen. Eine optionale letzte Aufgabe (Finalizer) kann Bereinigungsoperationen durchführen, nachdem alle anderen Aufgaben abgeschlossen sind.

Erstellen Sie Task-Graphen mit dynamischer Verhaltensweise, indem Sie logikbasierte Operationen im Textteil der Aufgabe unter Verwendung von Laufzeitwerten, Konfiguration auf Graph-Ebene und Rückgabewerten übergeordneter Aufgaben angeben.

Sie können Aufgaben und Task-Graphen mit den von unterstützten Sprachen und Tools wie SQL, JavaScript, Python, Java, Scala oder Snowflake Scripting erstellen. Dieses Thema enthält SQL-Beispiele. Beispiele für Python finden Sie unter Verwalten von Snowflake-Aufgaben und Task-Graphen mit Python.

Task-Graphen erstellen

Erstellen Sie mit CREATE TASK eine Stammaufgabe, und erstellen Sie dann mit CREATE TASK untergeordnete Aufgaben. AFTER, um die übergeordneten Aufgaben auszuwählen.

Die Stammaufgabe definiert, wann der Task-Graph läuft. Untergeordnete Aufgaben werden in der durch den Task-Graphen festgelegten Reihenfolge ausgeführt.

Wenn mehrere untergeordnete Aufgaben die gleiche übergeordnete Aufgabe haben, laufen die untergeordneten Aufgaben parallel.

Wenn eine Aufgabe mehrere übergeordnete Objekte hat, wartet die Aufgabe, bis alle vorangegangenen Aufgaben erfolgreich abgeschlossen sind, bevor sie startet. (Die Aufgabe kann auch ausgeführt werden, wenn einige übergeordnete Aufgaben übersprungen werden. Weitere Informationen dazu finden Sie unter Eine untergeordnete Aufgabe überspringen oder aussetzen).

Im folgenden Beispiel wird ein serverloser Task-Graph erstellt, der mit einer Stammaufgabe beginnt, die jede Minute ausgeführt werden soll. Die Stammaufgabe hat zwei untergeordnete Aufgaben, die parallel laufen. (Das Diagramm zeigt ein Beispiel, bei dem eine dieser Aufgaben länger läuft als die andere) Nachdem diese beiden Aufgaben abgeschlossen sind, wird eine dritte untergeordnete Aufgabe ausgeführt. Die Finalizer-Aufgabe wird ausgeführt, nachdem alle anderen Aufgaben abgeschlossen sind oder nicht abgeschlossen werden konnten:

Ein Diagramm mit einer Sequenz von Aufgaben.
CREATE TASK task_root
  SCHEDULE = '1 MINUTE'
  AS SELECT 1;

CREATE TASK task_a
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_b
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_c
  AFTER task_a, task_b
  AS SELECT 1;
Copy

Hinweise:

  • Ein Task-Graph ist auf maximal 1.000 Aufgaben beschränkt.

  • Eine einzelne Aufgabe kann maximal 100 übergeordnete Aufgaben und 100 untergeordnete Aufgaben haben.

  • Wenn Aufgaben parallel auf demselben vom Benutzer verwalteten Warehouse ausgeführt werden, müssen die Computeressourcen so bemessen sein, dass sie die gleichzeitigen Aufgabenläufe bewältigen können.

Finalizer-Aufgabe

Sie können eine optionale Finalizer-Aufgabe hinzufügen, die ausgeführt wird, nachdem alle anderen Aufgaben im Task-Graph abgeschlossen sind (oder nicht abgeschlossen wurden). Verwenden Sie dies, um Folgendes zu tun:

  • Bereinigungsoperationen durchführen, z. B. Bereinigung von nicht mehr benötigten Zwischendaten.

  • Senden Sie Benachrichtigungen über den Erfolg oder Misserfolg einer Aufgabe.

Eine Aufgabensequenz zeigt eine Stammaufgabe, die auf zwei untergeordnete Aufgaben verweist, die ihrerseits auf eine andere Aufgabe verweisen. Unten wird eine Finalizer-Aufgabe angezeigt, die ausgeführt wird, nachdem alle anderen Aufgaben abgeschlossen sind oder nicht abgeschlossen werden konnten.

Um eine Finalizer-Aufgabe zu erstellen, verwenden Sie CREATE TASK. .. FINALIZE. .. für die Stammaufgabe. Beispiel:

CREATE TASK task_finalizer
  FINALIZE = task_root
  AS SELECT 1;
Copy

Hinweise:

  • Eine Finalizer-Aufgabe ist immer mit einer Stammaufgabe (root task) verbunden. Jede Stammaufgabe kann nur eine Finalizer-Aufgabe haben und eine Finalizer-Aufgabe kann nur mit einer Stammaufgabe verbunden sein.

  • Wenn die Stammaufgabe eines Task-Graphen übersprungen wird (z. B. aufgrund von überlappenden Task-Graph-Läufen), wird die Finalizer-Aufgabe nicht gestartet.

  • Eine Finalizer-Aufgabe kann keine untergeordneten Aufgaben haben.

  • Eine Finalizer-Aufgabe wird nur geplant, wenn keine anderen Aufgaben laufen oder in der Warteschlange des aktuellen Task-Graphen stehen.

Weitere Beispiele finden Sie unter Beispiel für eine Finalizer-Aufgabe: E-Mail-Benachrichtigung senden und Beispiel für eine Finalizer-Aufgabe: Fehler korrigieren.

Eigentümerschaft des Task-Graphen verwalten

Alle Aufgaben eines einfachen Task-Graphen müssen denselben Aufgabeneigentümer haben und in derselben Datenbank und in demselben Schema gespeichert sein.

Sie können die Eigentümerschaft an allen Aufgaben eines Task-Graphen mit einer der folgenden Aktionen übertragen:

  • Löschen des Eigentümers aller Aufgaben im Task-Graphen mit DROP ROLE. Snowflake überträgt die Eigentümerschaft an die Rolle, die den Befehl DROP ROLE ausführt.

  • Übertragen der Eigentümerschaft aller Aufgaben im Task-Graphen mit GRANT OWNERSHIP auf alle Aufgaben in einem Schema.

Wenn Sie die Eigentümerschaft der Aufgaben in einem Task-Graph mit diesen Methoden übertragen, behalten die Aufgaben im Task-Graphen ihre Beziehungen untereinander bei.

Durch das Übertragen der Eigentümerschaft einer einzelnen Aufgabe wird die Abhängigkeit zwischen der Aufgabe und allen über- und untergeordneten Aufgaben aufgehoben. Weitere Informationen dazu finden Sie unter Übergeordnete und untergeordnete Aufgaben trennen (unter diesem Thema).

Bemerkung

Die Datenbankreplikation funktioniert nicht für Task-Graphen, wenn das Diagramm einer anderen Rolle gehört als der Rolle, die die Replikation durchführt.

Aufgaben in einem Task-Graphen ausführen oder planen

Einen Task-Graphen manuell ausführen

Sie können eine einzelne Instanz eines Task-Graphen ausführen. Dies ist nützlich, um neue oder geänderte Task-Graphen zu testen, bevor Sie den Task-Graphen in der Produktion aktivieren, oder für einmalige Läufe nach Bedarf.

Bevor Sie den Task-Graphen starten, verwenden Sie ALTER TASK. .. RESUME für jede untergeordnete Aufgabe (einschließlich der optionalen Finalizer-Aufgabe), die Sie in den Lauf einbeziehen möchten.

Um eine einzelne Instanz eines Task-Graphen auszuführen, verwenden Sie EXECUTE TASK für die Stammaufgabe. Wenn Sie die Stammaufgabe ausführen, werden alle wiederaufgenommenen untergeordneten Aufgaben im Task-Graphen in der durch den Task-Graphen festgelegten Reihenfolge ausgeführt.

Eine Aufgabe nach einem Zeitplan oder als getriggerte Aufgabe ausführen

Legen Sie in der Stammaufgabe fest, wann der Task-Graph läuft. Task-Graphen können nach einem wiederkehrenden Zeitplan laufen oder durch ein Ereignis getriggert werden. Weitere Informationen dazu finden Sie unter folgenden Themen:

Um den Task-Graphen zu starten, können Sie einen der folgenden Schritte ausführen:

  • Setzen Sie jede einzelne untergeordnete Aufgabe (einschließlich des Finalizers) fort, die Sie in den Lauf einbeziehen möchten, und setzen Sie dann die Stammaufgabe fort, indem Sie ALTER TASK. .. RESUME verwenden.

  • Nehmen Sie alle Aufgaben in einem Task-Graphen auf einmal wieder auf, indem Sie SYSTEM$TASK_DEPENDENTS_ENABLE ( <root_task_name> ) für die Stammaufgabe verwenden.

Abhängige Aufgaben eines Task-Graphen anzeigen

Um die untergeordneten Aufgaben für eine Stammaufgabe anzuzeigen, rufen Sie die Tabellenfunktion TASK_DEPENDENTS auf. Um alle Aufgaben in einem Task-Graphen abzurufen, müssen Sie beim Aufrufen der Funktion die Stammaufgabe angeben.

Sie können auch Snowsight verwenden, um Ihre Task-Graphen zu verwalten und anzuzeigen. Weitere Informationen dazu finden Sie unter Anzeigen von Aufgaben und Aufgabendiagrammen in Snowsight.

Aufgaben ändern, aussetzen oder erneut versuchen

Ändern einer Aufgabe in einem Task-Graphen

Um eine Aufgabe in einem geplanten Task-Graphen zu ändern, setzen Sie die Stammaufgabe mit ALTER TASK. .. SUSPEND aus. Wenn ein Lauf des Task-Graphen in Bearbeitung ist, wird der aktuelle Lauf abgeschlossen. Alle zukünftigen geplanten Ausführungen der Stammaufgabe werden abgebrochen.

Wenn die Stammaufgabe ausgesetzt wird, behalten die untergeordneten Aufgaben, einschließlich der Finalizer-Aufgabe, ihren Status bei (ausgesetzt, ausgeführt oder abgeschlossen). Die untergeordneten Aufgaben müssen nicht einzeln ausgesetzt werden.

Nachdem Sie die Stammaufgabe ausgesetzt haben, können Sie jede Aufgabe im Task-Graphen ändern.

Um den Task-Graphen fortzusetzen, können Sie einen der folgenden Schritte ausführen:

  • Setzen Sie die Stammaufgabe mit ALTER TASK. .. RESUME fort. Einzelne untergeordnete Aufgaben, die vorher liefen, müssen nicht wieder aufgenommen werden.

  • Setzen Sie alle Aufgaben in einem Task-Graphen auf einmal fort, indem Sie SYSTEM$TASK_DEPENDENTS_ENABLE aufrufen und den Namen der Stammaufgabe übergeben.

Eine untergeordnete Aufgabe überspringen oder aussetzen

Um eine untergeordnete Aufgabe in einem Task-Graphen zu überspringen, setzen Sie die untergeordnete Aufgabe mit ALTER TASK. .. SUSPEND aus.

Wenn Sie eine untergeordnete Aufgabe aussetzen, läuft der Task-Graph weiter, als ob die untergeordnete Aufgabe erfolgreich gewesen wäre. Eine untergeordnete Aufgabe mit mehreren Vorgängern wird solange ausgeführt, bis sich mindestens einer der Vorgänger im Zustand „Fortgesetzt“ befindet und alle fortgesetzten Vorgänger erfolgreich zu Ende geführt werden.

Ein Diagramm zeigt einen Task-Graphen, der eine ausgesetzte untergeordnete Aufgabe enthält. Die ausgesetzte untergeordnete Aufgabe wird übersprungen und der Task-Graph wird angeschlossen.

Eine fehlgeschlagene Aufgabe wiederholen

Verwenden Sie EXECUTE TASK. .. RETRY LAST, um zu versuchen, den Task-Graphen der letzten fehlgeschlagenen Aufgabe auszuführen. Wenn die Aufgabe erfolgreich ist, werden alle untergeordneten Aufgaben weiter ausgeführt, sobald die vorangegangenen Aufgaben abgeschlossen sind.

Automatische Wiederholungsversuche

Wenn eine untergeordnete Aufgabe fehlschlägt, wird standardmäßig der gesamte Task-Graph als fehlgeschlagen betrachtet.

Anstatt bis zur nächsten geplanten Ausführung des Task-Graph zu warten, können Sie den Task-Graphen anweisen, es sofort erneut zu versuchen, indem Sie den Parameter TASK_AUTO_RETRY_ATTEMPTS für die Stammaufgabe festlegen. Wenn eine untergeordnete Aufgabe fehlschlägt, wird der gesamte Task-Graph sofort erneut versucht, und zwar so oft, wie angegeben. Wenn der Task-Graph immer noch nicht abgeschlossen ist, gilt der Task-Graph als fehlgeschlagen.

Task-Graphen nach fehlgeschlagenen Task-Graphen-Läufen aussetzen

Standardmäßig wird ein Task-Graph nach 10 aufeinanderfolgenden Fehlschlägen ausgesetzt. Sie können diesen Wert ändern, indem Sie SUSPEND_TASK_AFTER_NUM_FAILURES für die Stammaufgabe einstellen.

Wenn im folgenden Beispiel eine untergeordnete Aufgabe fehlschlägt, versucht der Task-Graph sofort zweimal, bevor der gesamte Task-Graph als fehlgeschlagen gilt. Wenn der Task-Graph dreimal hintereinander fehlschlägt, wird der Task-Graph ausgesetzt.

CREATE OR REPLACE TASK task_root
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2   --  Failed task graph retries up to 2 times
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3   --  Task graph suspends after 3 consecutive failures
  AS SELECT 1;
Copy

Überlappende Task-Graph-Ausführung

Snowflake stellt standardmäßig sicher, dass jeweils nur eine Instanz eines bestimmten Task-Graphen ausgeführt wird. Die nächste Ausführung einer Stammaufgabe wird erst geplant, wenn die Ausführung aller Aufgaben im Task-Graphen beendet wurde. Das heißt, wenn die kumulierte Zeit, die für die Ausführung aller Aufgaben im Task-Graphen benötigt wird, die in der Definition der Stammaufgabe festgelegte explizit geplante Zeit überschreitet, wird mindestens eine Ausführung des Task-Graphen übersprungen.

Damit sich untergeordnete Aufgaben überschneiden können, verwenden Sie CREATE TASK oder ALTER TASK für die Stammaufgabe und setzen ALLOW_OVERLAPPING_EXECUTION auf TRUE. (Stammaufgaben überschneiden sich nie.)

Überlappende Task-Graph-Ausführung

Überlappende Ausführungen können toleriert werden (oder sind sogar erwünscht), wenn SQL-Lese-/Schreiboperationen, die durch überlappende Ausführungen eines Task-Graphen ausgeführt werden, keine falschen oder duplizierten Daten erzeugen. Für andere Task-Graphen sollten Aufgabeneigentümer (die Rolle mit OWNERSHIP-Berechtigung für alle Aufgaben im Task-Graphen) einen geeigneten Zeitplan für die Stammaufgabe festlegen und eine geeignete Warehouse-Größe auswählen (oder serverlose Computeressourcen verwenden), um sicherzustellen, dass eine Instanz des Task-Graphen bis zum Ende ausgeführt wird, bevor die nächste Ausführung der Stammaufgabe geplant wird.

So stimmen Sie einen Task-Graphen besser mit dem in der Stammaufgabe definierten Zeitplan ab:

  1. Erhöhen Sie, falls möglich, die Planungszeit zwischen den Ausführungen der Stammaufgabe.

  2. Erwägen Sie, rechenintensive Aufgaben so zu ändern, dass sie serverlose Computeressourcen nutzen. Wenn die Aufgabe auf benutzerverwaltete Computeressourcen angewiesen ist, sollten Sie die Größe für das Warehouse erhöhen, das umfangreiche oder komplexe SQL-Anweisungen oder gespeicherte Prozeduren im Task-Graphen ausführt.

  3. Analysieren Sie die SQL-Anweisungen oder gespeicherten Prozeduren, die von jeder Aufgabe ausgeführt werden. Stellen Sie fest, ob der Code umgeschrieben werden kann, um die Parallelverarbeitung zu nutzen.

Wenn keine der obigen Lösungen hilft, überlegen Sie, ob es notwendig ist, gleichzeitige Ausführungen des Task-Graphen durch Festlegen von ALLOW_OVERLAPPING_EXECUTION = TRUE für die Stammaufgabe zuzulassen. Dieser Parameter kann beim Erstellen einer Aufgabe definiert werden (mit CREATE TASK) oder später (mit ALTER TASK oder in Snowsight).

Versionierung

Wenn die Stammaufgabe in einem Task-Graphen fortgesetzt oder manuell ausgeführt wird, legt Snowflake eine Version für den gesamten Task-Graphen fest, einschließlich aller Eigenschaften für alle Aufgaben im Task-Graphen. Nachdem eine Aufgabe angehalten und geändert wird, legt Snowflake beim Fortsetzen oder manuellen Ausführen der Stammaufgabe eine neue Version fest.

Um eine beliebige Aufgabe in einem Task-Graphen zu ändern oder neu zu erstellen, muss zuerst die Stammaufgabe angehalten werden. Wenn die Stammaufgabe (Root Task) angehalten wird, werden alle zukünftigen geplanten Ausführungen der Stammaufgabe abgebrochen. Wenn sich Aufgaben jedoch gerade in Ausführung befinden, werden diese Aufgaben und alle nachfolgenden Aufgaben weiterhin mit der aktuellen Version ausgeführt.

Bemerkung

Wenn sich die Definition einer gespeicherten Prozedur, die von einer Aufgabe aufgerufen wurde, während der Ausführung des Task-Graphen ändert, kann der geänderte Code der gespeicherten Prozedur bei Aufruf durch die Aufgabe bereits in der aktuellen Ausführung zur Anwendung kommen.

Angenommen, die Stammaufgabe im Task-Graphen wird angehalten, aber eine geplante Ausführung dieser Aufgabe hat bereits begonnen. Der Eigentümer aller Aufgaben im Task-Graphen ändert den von einer untergeordneten Aufgabe aufgerufenen SQL-Code, während sich die Stammaufgabe noch in Ausführung befindet. Die untergeordnete Aufgabe wird ausgeführt und führt den SQL-Code in ihrer Definition aus, wobei die Version des Task-Graphen verwendet wird, die aktuell war, als die Ausführung der Stammaufgabe begann. Wenn die Stammaufgabe fortgesetzt oder manuell ausgeführt wird, wird eine neue Version des Task-Graphen erstellt. Diese neue Version enthält die an der untergeordneten Aufgabe vorgenommenen Änderungen.

Um den Versionsverlauf der Aufgabenversionen abzurufen, fragen Sie die Account Usage-Ansicht TASK_VERSIONS (in der freigegebenen SNOWFLAKE-Datenbank) ab.

Dauer des Task-Graphen

Die Dauer des Task-Graphen umfasst die Zeit zwischen dem geplanten Start der Stammaufgabe und der Beendigung der letzten untergeordneten Aufgabe. Um die Dauer eines Task-Graphen zu berechnen, fragen Sie Ansicht COMPLETE_TASK_GRAPHS ab und vergleichen SCHEDULED_TIME mit COMPLETED_TIME.

Das folgende Diagramm zeigt zum Beispiel einen Task-Graphen, der jede Minute ausgeführt werden soll. Die Stammaufgabe und ihre beiden untergeordneten Aufgaben stehen jeweils 5 Sekunden in der Warteschlange und laufen 10 Sekunden lang, so dass sie insgesamt 45 Sekunden benötigen.

Ein Diagramm eines Task-Graphen, der drei Aufgaben mit Abhängigkeiten enthält. Jede Aufgabe steht 5 Sekunden in der Warteschlange und wird 10 Sekunden lang ausgeführt, so dass sie insgesamt 45 Sekunden dauert.

Timeouts im Task-Graphen

Wenn USER_TASK_TIMEOUT_MS in der Stammaufgabe festgelegt ist, gilt das Timeout für den gesamten Task-Graphen.

Wenn USER_TASK_TIMEOUT_MS in einer untergeordneten Aufgabe oder Finalizer-Aufgabe eingestellt ist, gilt das Timeout nur für diese Aufgabe.

Wenn USER_TASK_TIMEOUT_MS sowohl in der Stammaufgabe als auch in einer untergeordneten Aufgabe festgelegt ist, hat das Timeout der untergeordneten Aufgabe Vorrang vor dem Timeout der Stammaufgabe für diese untergeordnete Aufgabe.

Hinweise

  • Bei serverlosen Aufgaben skaliert Snowflake automatisch die Ressourcen, um sicherzustellen, dass die Aufgaben innerhalb eines Zielintervalls abgeschlossen werden, einschließlich der Zeit für die Warteschlangenbildung.

  • Bei vom Benutzer verwalteten Aufgaben sind längere Wartezeiten üblich, wenn die Aufgaben in einem gemeinsam genutzten oder stark frequentierten Warehouse ausgeführt werden sollen.

  • Bei Task-Graphen kann die Gesamtzeit zusätzliche Wartezeit für untergeordnete Aufgaben enthalten, die auf die Fertigstellung ihrer Vorgänger warten.

Erstellen Sie einen Task-Graphen mit Logik (Laufzeitinformationen, Konfiguration und Rückgabewerte)

Aufgaben in einem Task-Graphen können Rückgabewerte von übergeordneten Aufgaben verwenden, um logikbasierte Operationen in ihrem Funktionskörper durchzuführen.

Hinweise:

  • Bei einigen logikbasierten Befehlen, wie SYSTEM$GET_PREDECESSOR_RETURN_VALUE, wird zwischen Groß- und Kleinschreibung unterschieden. Aufgaben, die mit CREATE TASK ohne Anführungszeichen erstellt werden, werden jedoch in Großbuchstaben gespeichert und aufgelöst. Um dies zu bewerkstelligen, können Sie eine der folgenden Möglichkeiten nutzen:

    • Erstellen Sie Aufgabennamen nur mit Großbuchstaben.

    • Verwenden Sie Anführungszeichen, wenn Sie Aufgaben benennen und aufrufen.

    • Bei Aufgabennamen, die mit Kleinbuchstaben definiert sind, rufen Sie die Aufgabe mit Großbuchstaben auf. Ein Beispiel: Eine Aufgabe, die durch „CREATE TASK task_c…“ definiert ist, kann als SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE(‚TASK_C‘) aufgerufen werden.

Übergeben Sie Konfigurationsinformationen an den Task-Graphen

Sie können Konfigurationsinformationen über ein JSON-Objekt weitergeben, das von anderen Aufgaben in einem Task-Graphen gelesen werden kann. Verwenden Sie die Syntax CREATE/ALTER TASK. .. CONFIG, um die Konfigurationsinformationen in der Stammaufgabe zu setzen, aufzuheben oder zu ändern. Verwenden Sie die Funktion SYSTEM$GET_TASK_GRAPH_CONFIG, um sie abzurufen. Beispiel:

CREATE OR REPLACE TASK "task_root"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS SELECT 1;

CREATE OR REPLACE TASK "task_a"
  USER_TASK_TIMEOUT_MS = 600000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
Copy

Rückgabewerte zwischen Aufgaben weitergeben

Sie können Rückgabewerte zwischen Aufgaben in einem Task-Graphen übergeben Verwenden Sie die Funktion SYSTEM$SET_RETURN_VALUE, um einen Rückgabewert von einer Aufgabe hinzuzufügen, und verwenden Sie die Funktion SYSTEM$GET_PREDECESSOR_RETURN_VALUE, um ihn abzurufen.

Wenn eine Aufgabe mehrere Vorgänger hat, müssen Sie angeben, welche Aufgabe den von Ihnen gewünschten Rückgabewert hat. Im folgenden Beispiel erstellen wir eine Stammaufgabe in einem Task-Graphen, die Konfigurationsinformationen hinzufügt.

CREATE OR REPLACE TASK "task_c"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
    END;

CREATE OR REPLACE TASK "task_d"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_c"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
    END;
Copy

Laufzeitinformationen abrufen und verwenden

Verwenden Sie die Funktion SYSTEM$TASK_RUNTIME_INFO, um Informationen über den aktuellen Aufgabenlauf zu melden. Diese Funktion verfügt über mehrere Optionen, die speziell für Task-Graphen gelten. Verwenden Sie zum Beispiel CURRENT_ROOT_TASK_NAME, um den Namen der Stammaufgabe im aktuellen Task-Graphen zu ermitteln. Die folgenden Beispiele zeigen, wie Sie einer Tabelle einen Datumsstempel hinzufügen, der darauf basiert, wann die Stammaufgabe des Task-Graphen gestartet wurde.

-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO date_time_table VALUES('order_date',:value);
    END;
Copy

Beispiele

Beispiel: Mehrere Aufgaben starten und Status melden

Im folgenden Beispiel startet die Stammaufgabe Aufgaben zur Aktualisierung von drei verschiedenen Tabellen. Nachdem diese drei Tabellen aktualisiert wurden, kombiniert eine Aufgabe die Informationen aus den anderen drei Tabellen zu einer aggregierten Verkaufstabelle.

Das Flussdiagramm zeigt eine Stammaufgabe, die drei untergeordnete Aufgaben startet, von denen jede eine Tabelle aktualisiert. Diese drei Aufgaben gehen alle einer weiteren untergeordneten Aufgabe voraus, die die vorherigen Änderungen in einer weiteren Tabelle zusammenfasst.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
    END;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT customer_id FROM ref_cust_table
        WHERE cust_name = "Jane Doe";);
      INSERT INTO customer_table VALUES('customer_id',:value);
    END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT product_id FROM ref_item_table
        WHERE PRODUCT_NAME = "widget";);
      INSERT INTO product_table VALUES('product_id',:value);
    END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO "date_time_table" VALUES('order_date',:value);
    END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_customer_table, task_product_table, task_date_time_table
  AS
    BEGIN
      LET VALUE := (SELECT sales_order_id FROM ORDERS);
      JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
      INSERT INTO sales_table VALUES('sales_order_id',:value);
    END;
;
Copy

Beispiel für eine Finalizer-Aufgabe: E-Mail-Benachrichtigung senden

Dieses Beispiel zeigt, wie eine Finalizer-Aufgabe eine E-Mail senden kann, in der zusammengefasst wird, wie der Task-Graph ausgeführt wurde. Die Aufgabe ruft zwei externe Funktionen auf: eine sammelt Informationen über den Status der Aufgabenerledigung und die andere verwendet die Informationen, um eine E-Mail zu verfassen, die über einen Remote-Messaging-Dienst versendet werden kann.

Eine Aufgabensequenz zeigt eine Stammaufgabe, die auf zwei untergeordnete Aufgaben verweist, die ihrerseits auf eine andere Aufgabe verweisen. Am unteren Ende wird eine Finalizer-Aufgabe angezeigt, die ausgeführt wird, nachdem alle anderen Aufgaben beendet wurden oder nicht beendet werden konnten.
CREATE OR REPLACE TASK notify_finalizer
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_root
AS
  DECLARE
    my_root_task_id STRING;
    my_start_time TIMESTAMP_LTZ;
    summary_json STRING;
    summary_html STRING;
  BEGIN
    --- Get root task ID
    my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
    --- Get root task scheduled time
    my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
    --- Combine all task run info into one JSON string
    summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
    --- Convert JSON into HTML table
    summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));

    --- Send HTML to email
    CALL SYSTEM$SEND_EMAIL(
        'email_notification',
        'admin@snowflake.com',
        'notification task run summary',
        :summary_html,
        'text/html');
    --- Set return value for finalizer
    CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
  END

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
  RETURNS STRING
AS
$$
  (SELECT
    ARRAY_AGG(OBJECT_CONSTRUCT(
      'task_name', name,
      'run_status', state,
      'return_value', return_value,
      'started', query_start_time,
      'duration', duration,
      'error_message', error_message
      )
    ) AS GRAPH_RUN_SUMMARY
  FROM
    (SELECT
      NAME,
      CASE
        WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
        WHEN STATE = 'FAILED' then '🔴 Failed'
        WHEN STATE = 'SKIPPED' then '🔵 Skipped'
        WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
      END AS STATE,
      RETURN_VALUE,
      TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
      CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
        ' s') AS DURATION,
      ERROR_MESSAGE
    FROM
      TABLE(my-database.information_schema.task_history(
        ROOT_TASK_ID => my_root_task_id ::STRING,
        SCHEDULED_TIME_RANGE_START => my_start_time,
        SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
    ORDER BY
      SCHEDULED_TIME)
  )::STRING
$$
;

CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
  IMPORT JSON

  def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

  DATA = json.loads(JSON_DATA)
  HTML = f"""
    <img src="https://example.com/logo.jpg"
      alt="Company logo" height="72">
    <p><strong>Task Graph Run Summary</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
        """
        headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
        for i, header in enumerate(headers):
            HTML += f'<th scope="col" style="text-align:left;
            width: {column_widths[i]}">{header.capitalize()}</th>'

        HTML +="""
        </tr>
      </thead>
      <tbody>
        """
        for ROW_DATA in DATA:
          HTML += "<tr>"
          for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
          HTML += "</tr>"
        HTML +="""
      </tbody>
    </table>
    """
  return HTML
$$
;
Copy

Beispiel für eine Finalizer-Aufgabe: Fehler korrigieren

Dieses Beispiel zeigt, wie eine Finalisierungsaufgabe Fehler korrigieren kann.

Zu Demonstrationszwecken sind die Aufgaben so konzipiert, dass sie bei ihrem ersten Durchlauf fehlschlagen. Die Finalisierungsaufgaben korrigieren das Problem und starten die Aufgaben neu, die bei den folgenden Durchläufen erfolgreich sind:

Diagramm mit einer Aufgabenserie. Aufgabe A ist oben links abgebildet. Ein Pfeil zeigt von Aufgabe A nach rechts zu Aufgabe B, die auf Aufgabe C zeigt, die wiederum auf Aufgabe D zeigt. Unterhalb von Aufgabe A zeigt ein Pfeil auf die Finalisierungsaufgabe, Aufgabe F.
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations.
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively three times, suspend the task.
--    * Other environment values are set.

CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task a successful');
    END;
;

-- 2. Use a runtime reflection variable.
--    Creates a child task ("task_b").
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO demo_table VALUES('task b name',:VALUE);
    END;
;

-- 3. Get a task graph configuration value.
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_b
  AS
    BEGIN
      CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_c
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
      INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
    END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: task_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_a
  AS
    BEGIN
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
    END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
  FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
  LIMIT 5;
;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     After the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy