Erstellen Sie eine Sequenz von Aufgaben mit einem Task-Graphen¶
In Snowflake können Sie mehrere Aufgaben mit einem Task-Graphen verwalten, der auch als gerichteter azyklischer Graph oder DAG bezeichnet wird. Ein Task-Graph besteht aus einer Stammaufgabe und abhängigen untergeordnete Aufgabe. Die Abhängigkeiten müssen in einer Richtung von Anfang bis Ende laufen, ohne Schleifen. Eine optionale letzte Aufgabe (Finalizer) kann Bereinigungsoperationen durchführen, nachdem alle anderen Aufgaben abgeschlossen sind.
Erstellen Sie Task-Graphen mit dynamischer Verhaltensweise, indem Sie logikbasierte Operationen im Textteil der Aufgabe unter Verwendung von Laufzeitwerten, Konfiguration auf Graph-Ebene und Rückgabewerten übergeordneter Aufgaben angeben.
Sie können Aufgaben und Task-Graphen mit den von unterstützten Sprachen und Tools wie SQL, JavaScript, Python, Java, Scala oder Snowflake Scripting erstellen. Dieses Thema enthält SQL-Beispiele. Beispiele für Python finden Sie unter Verwalten von Snowflake-Aufgaben und Task-Graphen mit Python.
Task-Graphen erstellen¶
Erstellen Sie mit CREATE TASK eine Stammaufgabe, und erstellen Sie dann mit CREATE TASK untergeordnete Aufgaben. AFTER, um die übergeordneten Aufgaben auszuwählen.
Die Stammaufgabe definiert, wann der Task-Graph läuft. Untergeordnete Aufgaben werden in der durch den Task-Graphen festgelegten Reihenfolge ausgeführt.
Wenn mehrere untergeordnete Aufgaben die gleiche übergeordnete Aufgabe haben, laufen die untergeordneten Aufgaben parallel.
Wenn eine Aufgabe mehrere übergeordnete Objekte hat, wartet die Aufgabe, bis alle vorangegangenen Aufgaben erfolgreich abgeschlossen sind, bevor sie startet. (Die Aufgabe kann auch ausgeführt werden, wenn einige übergeordnete Aufgaben übersprungen werden. Weitere Informationen dazu finden Sie unter Eine untergeordnete Aufgabe überspringen oder aussetzen).
Im folgenden Beispiel wird ein serverloser Task-Graph erstellt, der mit einer Stammaufgabe beginnt, die jede Minute ausgeführt werden soll. Die Stammaufgabe hat zwei untergeordnete Aufgaben, die parallel laufen. (Das Diagramm zeigt ein Beispiel, bei dem eine dieser Aufgaben länger läuft als die andere) Nachdem diese beiden Aufgaben abgeschlossen sind, wird eine dritte untergeordnete Aufgabe ausgeführt. Die Finalizer-Aufgabe wird ausgeführt, nachdem alle anderen Aufgaben abgeschlossen sind oder nicht abgeschlossen werden konnten:
CREATE TASK task_root
SCHEDULE = '1 MINUTE'
AS SELECT 1;
CREATE TASK task_a
AFTER task_root
AS SELECT 1;
CREATE TASK task_b
AFTER task_root
AS SELECT 1;
CREATE TASK task_c
AFTER task_a, task_b
AS SELECT 1;
Hinweise:
Ein Task-Graph ist auf maximal 1.000 Aufgaben beschränkt.
Eine einzelne Aufgabe kann maximal 100 übergeordnete Aufgaben und 100 untergeordnete Aufgaben haben.
Wenn Aufgaben parallel auf demselben vom Benutzer verwalteten Warehouse ausgeführt werden, müssen die Computeressourcen so bemessen sein, dass sie die gleichzeitigen Aufgabenläufe bewältigen können.
Finalizer-Aufgabe¶
Sie können eine optionale Finalizer-Aufgabe hinzufügen, die ausgeführt wird, nachdem alle anderen Aufgaben im Task-Graph abgeschlossen sind (oder nicht abgeschlossen wurden). Verwenden Sie dies, um Folgendes zu tun:
Bereinigungsoperationen durchführen, z. B. Bereinigung von nicht mehr benötigten Zwischendaten.
Senden Sie Benachrichtigungen über den Erfolg oder Misserfolg einer Aufgabe.
Um eine Finalizer-Aufgabe zu erstellen, verwenden Sie CREATE TASK. .. FINALIZE. .. für die Stammaufgabe. Beispiel:
CREATE TASK task_finalizer
FINALIZE = task_root
AS SELECT 1;
Hinweise:
Eine Finalizer-Aufgabe ist immer mit einer Stammaufgabe (root task) verbunden. Jede Stammaufgabe kann nur eine Finalizer-Aufgabe haben und eine Finalizer-Aufgabe kann nur mit einer Stammaufgabe verbunden sein.
Wenn die Stammaufgabe eines Task-Graphen übersprungen wird (z. B. aufgrund von überlappenden Task-Graph-Läufen), wird die Finalizer-Aufgabe nicht gestartet.
Eine Finalizer-Aufgabe kann keine untergeordneten Aufgaben haben.
Eine Finalizer-Aufgabe wird nur geplant, wenn keine anderen Aufgaben laufen oder in der Warteschlange des aktuellen Task-Graphen stehen.
Weitere Beispiele finden Sie unter Beispiel für eine Finalizer-Aufgabe: E-Mail-Benachrichtigung senden und Beispiel für eine Finalizer-Aufgabe: Fehler korrigieren.
Eigentümerschaft des Task-Graphen verwalten¶
Alle Aufgaben eines einfachen Task-Graphen müssen denselben Aufgabeneigentümer haben und in derselben Datenbank und in demselben Schema gespeichert sein.
Sie können die Eigentümerschaft an allen Aufgaben eines Task-Graphen mit einer der folgenden Aktionen übertragen:
Löschen des Eigentümers aller Aufgaben im Task-Graphen mit DROP ROLE. Snowflake überträgt die Eigentümerschaft an die Rolle, die den Befehl DROP ROLE ausführt.
Übertragen der Eigentümerschaft aller Aufgaben im Task-Graphen mit GRANT OWNERSHIP auf alle Aufgaben in einem Schema.
Wenn Sie die Eigentümerschaft der Aufgaben in einem Task-Graph mit diesen Methoden übertragen, behalten die Aufgaben im Task-Graphen ihre Beziehungen untereinander bei.
Durch das Übertragen der Eigentümerschaft einer einzelnen Aufgabe wird die Abhängigkeit zwischen der Aufgabe und allen über- und untergeordneten Aufgaben aufgehoben. Weitere Informationen dazu finden Sie unter Übergeordnete und untergeordnete Aufgaben trennen (unter diesem Thema).
Bemerkung
Die Datenbankreplikation funktioniert nicht für Task-Graphen, wenn das Diagramm einer anderen Rolle gehört als der Rolle, die die Replikation durchführt.
Aufgaben in einem Task-Graphen ausführen oder planen¶
Einen Task-Graphen manuell ausführen¶
Sie können eine einzelne Instanz eines Task-Graphen ausführen. Dies ist nützlich, um neue oder geänderte Task-Graphen zu testen, bevor Sie den Task-Graphen in der Produktion aktivieren, oder für einmalige Läufe nach Bedarf.
Bevor Sie den Task-Graphen starten, verwenden Sie ALTER TASK. .. RESUME für jede untergeordnete Aufgabe (einschließlich der optionalen Finalizer-Aufgabe), die Sie in den Lauf einbeziehen möchten.
Um eine einzelne Instanz eines Task-Graphen auszuführen, verwenden Sie EXECUTE TASK für die Stammaufgabe. Wenn Sie die Stammaufgabe ausführen, werden alle wiederaufgenommenen untergeordneten Aufgaben im Task-Graphen in der durch den Task-Graphen festgelegten Reihenfolge ausgeführt.
Eine Aufgabe nach einem Zeitplan oder als getriggerte Aufgabe ausführen¶
Legen Sie in der Stammaufgabe fest, wann der Task-Graph läuft. Task-Graphen können nach einem wiederkehrenden Zeitplan laufen oder durch ein Ereignis getriggert werden. Weitere Informationen dazu finden Sie unter folgenden Themen:
Um den Task-Graphen zu starten, können Sie einen der folgenden Schritte ausführen:
Setzen Sie jede einzelne untergeordnete Aufgabe (einschließlich des Finalizers) fort, die Sie in den Lauf einbeziehen möchten, und setzen Sie dann die Stammaufgabe fort, indem Sie ALTER TASK. .. RESUME verwenden.
Nehmen Sie alle Aufgaben in einem Task-Graphen auf einmal wieder auf, indem Sie SYSTEM$TASK_DEPENDENTS_ENABLE ( <root_task_name> ) für die Stammaufgabe verwenden.
Abhängige Aufgaben eines Task-Graphen anzeigen¶
Um die untergeordneten Aufgaben für eine Stammaufgabe anzuzeigen, rufen Sie die Tabellenfunktion TASK_DEPENDENTS auf. Um alle Aufgaben in einem Task-Graphen abzurufen, müssen Sie beim Aufrufen der Funktion die Stammaufgabe angeben.
Sie können auch Snowsight verwenden, um Ihre Task-Graphen zu verwalten und anzuzeigen. Weitere Informationen dazu finden Sie unter Anzeigen von Aufgaben und Aufgabendiagrammen in Snowsight.
Aufgaben ändern, aussetzen oder erneut versuchen¶
Ändern einer Aufgabe in einem Task-Graphen¶
Um eine Aufgabe in einem geplanten Task-Graphen zu ändern, setzen Sie die Stammaufgabe mit ALTER TASK. .. SUSPEND aus. Wenn ein Lauf des Task-Graphen in Bearbeitung ist, wird der aktuelle Lauf abgeschlossen. Alle zukünftigen geplanten Ausführungen der Stammaufgabe werden abgebrochen.
Wenn die Stammaufgabe ausgesetzt wird, behalten die untergeordneten Aufgaben, einschließlich der Finalizer-Aufgabe, ihren Status bei (ausgesetzt, ausgeführt oder abgeschlossen). Die untergeordneten Aufgaben müssen nicht einzeln ausgesetzt werden.
Nachdem Sie die Stammaufgabe ausgesetzt haben, können Sie jede Aufgabe im Task-Graphen ändern.
Um den Task-Graphen fortzusetzen, können Sie einen der folgenden Schritte ausführen:
Setzen Sie die Stammaufgabe mit ALTER TASK. .. RESUME fort. Einzelne untergeordnete Aufgaben, die vorher liefen, müssen nicht wieder aufgenommen werden.
Setzen Sie alle Aufgaben in einem Task-Graphen auf einmal fort, indem Sie SYSTEM$TASK_DEPENDENTS_ENABLE aufrufen und den Namen der Stammaufgabe übergeben.
Eine untergeordnete Aufgabe überspringen oder aussetzen¶
Um eine untergeordnete Aufgabe in einem Task-Graphen zu überspringen, setzen Sie die untergeordnete Aufgabe mit ALTER TASK. .. SUSPEND aus.
Wenn Sie eine untergeordnete Aufgabe aussetzen, läuft der Task-Graph weiter, als ob die untergeordnete Aufgabe erfolgreich gewesen wäre. Eine untergeordnete Aufgabe mit mehreren Vorgängern wird solange ausgeführt, bis sich mindestens einer der Vorgänger im Zustand „Fortgesetzt“ befindet und alle fortgesetzten Vorgänger erfolgreich zu Ende geführt werden.
Eine fehlgeschlagene Aufgabe wiederholen¶
Verwenden Sie EXECUTE TASK. .. RETRY LAST, um zu versuchen, den Task-Graphen der letzten fehlgeschlagenen Aufgabe auszuführen. Wenn die Aufgabe erfolgreich ist, werden alle untergeordneten Aufgaben weiter ausgeführt, sobald die vorangegangenen Aufgaben abgeschlossen sind.
Automatische Wiederholungsversuche¶
Wenn eine untergeordnete Aufgabe fehlschlägt, wird standardmäßig der gesamte Task-Graph als fehlgeschlagen betrachtet.
Anstatt bis zur nächsten geplanten Ausführung des Task-Graph zu warten, können Sie den Task-Graphen anweisen, es sofort erneut zu versuchen, indem Sie den Parameter TASK_AUTO_RETRY_ATTEMPTS
für die Stammaufgabe festlegen. Wenn eine untergeordnete Aufgabe fehlschlägt, wird der gesamte Task-Graph sofort erneut versucht, und zwar so oft, wie angegeben. Wenn der Task-Graph immer noch nicht abgeschlossen ist, gilt der Task-Graph als fehlgeschlagen.
Task-Graphen nach fehlgeschlagenen Task-Graphen-Läufen aussetzen¶
Standardmäßig wird ein Task-Graph nach 10 aufeinanderfolgenden Fehlschlägen ausgesetzt. Sie können diesen Wert ändern, indem Sie SUSPEND_TASK_AFTER_NUM_FAILURES
für die Stammaufgabe einstellen.
Wenn im folgenden Beispiel eine untergeordnete Aufgabe fehlschlägt, versucht der Task-Graph sofort zweimal, bevor der gesamte Task-Graph als fehlgeschlagen gilt. Wenn der Task-Graph dreimal hintereinander fehlschlägt, wird der Task-Graph ausgesetzt.
CREATE OR REPLACE TASK task_root
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2 -- Failed task graph retries up to 2 times
SUSPEND_TASK_AFTER_NUM_FAILURES = 3 -- Task graph suspends after 3 consecutive failures
AS SELECT 1;
Übergeordnete und untergeordnete Aufgaben trennen¶
Abhängigkeiten zwischen Aufgaben in einem Task-Graphen können durch eine der folgenden Aktionen aufgelöst werden:
Mit ALTER TASK … REMOVE AFTER und ALTER TASK … UNSET FINALIZE kann die Verknüpfung zwischen der Zielaufgabe und den angegebenen übergeordneten Aufgaben oder der abgeschlossenen Stammaufgabe entfernt werden.
DROP TASK und GRANT OWNERSHIP trennen alle Verknüpfungen der Zielaufgabe. Beispiel: Stammaufgabe A hat eine untergeordnete Aufgabe B, und Aufgabe B hat eine untergeordnete Aufgabe C. Wenn Sie Aufgabe B löschen, wird die Verknüpfung zwischen Aufgabe A und Aufgabe B unterbrochen und damit auch die Verknüpfung zwischen Aufgabe B und Aufgabe C.
Wenn eine beliebige Kombination der oben genannten Aktionen die Beziehung zwischen der untergeordneten Aufgabe und allen übergeordneten Aufgaben aufhebt, dann wird die untergeordnete Aufgabe entweder eine eigenständige Aufgabe oder eine Stammaufgabe.
Bemerkung
Wenn Sie die Eigentümerschaft einer Aufgabe an den aktuellen Besitzer übertragen, werden die Abhängigkeitsverknüpfungen möglicherweise nicht getrennt.
Überlappende Task-Graph-Ausführung¶
Snowflake stellt standardmäßig sicher, dass jeweils nur eine Instanz eines bestimmten Task-Graphen ausgeführt wird. Die nächste Ausführung einer Stammaufgabe wird erst geplant, wenn die Ausführung aller Aufgaben im Task-Graphen beendet wurde. Das heißt, wenn die kumulierte Zeit, die für die Ausführung aller Aufgaben im Task-Graphen benötigt wird, die in der Definition der Stammaufgabe festgelegte explizit geplante Zeit überschreitet, wird mindestens eine Ausführung des Task-Graphen übersprungen.
Damit sich untergeordnete Aufgaben überschneiden können, verwenden Sie CREATE TASK oder ALTER TASK für die Stammaufgabe und setzen ALLOW_OVERLAPPING_EXECUTION auf TRUE. (Stammaufgaben überschneiden sich nie.)
Überlappende Ausführungen können toleriert werden (oder sind sogar erwünscht), wenn SQL-Lese-/Schreiboperationen, die durch überlappende Ausführungen eines Task-Graphen ausgeführt werden, keine falschen oder duplizierten Daten erzeugen. Für andere Task-Graphen sollten Aufgabeneigentümer (die Rolle mit OWNERSHIP-Berechtigung für alle Aufgaben im Task-Graphen) einen geeigneten Zeitplan für die Stammaufgabe festlegen und eine geeignete Warehouse-Größe auswählen (oder serverlose Computeressourcen verwenden), um sicherzustellen, dass eine Instanz des Task-Graphen bis zum Ende ausgeführt wird, bevor die nächste Ausführung der Stammaufgabe geplant wird.
So stimmen Sie einen Task-Graphen besser mit dem in der Stammaufgabe definierten Zeitplan ab:
Erhöhen Sie, falls möglich, die Planungszeit zwischen den Ausführungen der Stammaufgabe.
Erwägen Sie, rechenintensive Aufgaben so zu ändern, dass sie serverlose Computeressourcen nutzen. Wenn die Aufgabe auf benutzerverwaltete Computeressourcen angewiesen ist, sollten Sie die Größe für das Warehouse erhöhen, das umfangreiche oder komplexe SQL-Anweisungen oder gespeicherte Prozeduren im Task-Graphen ausführt.
Analysieren Sie die SQL-Anweisungen oder gespeicherten Prozeduren, die von jeder Aufgabe ausgeführt werden. Stellen Sie fest, ob der Code umgeschrieben werden kann, um die Parallelverarbeitung zu nutzen.
Wenn keine der obigen Lösungen hilft, überlegen Sie, ob es notwendig ist, gleichzeitige Ausführungen des Task-Graphen durch Festlegen von ALLOW_OVERLAPPING_EXECUTION = TRUE für die Stammaufgabe zuzulassen. Dieser Parameter kann beim Erstellen einer Aufgabe definiert werden (mit CREATE TASK) oder später (mit ALTER TASK oder in Snowsight).
Versionierung¶
Wenn die Stammaufgabe in einem Task-Graphen fortgesetzt oder manuell ausgeführt wird, legt Snowflake eine Version für den gesamten Task-Graphen fest, einschließlich aller Eigenschaften für alle Aufgaben im Task-Graphen. Nachdem eine Aufgabe angehalten und geändert wird, legt Snowflake beim Fortsetzen oder manuellen Ausführen der Stammaufgabe eine neue Version fest.
Um eine beliebige Aufgabe in einem Task-Graphen zu ändern oder neu zu erstellen, muss zuerst die Stammaufgabe angehalten werden. Wenn die Stammaufgabe (Root Task) angehalten wird, werden alle zukünftigen geplanten Ausführungen der Stammaufgabe abgebrochen. Wenn sich Aufgaben jedoch gerade in Ausführung befinden, werden diese Aufgaben und alle nachfolgenden Aufgaben weiterhin mit der aktuellen Version ausgeführt.
Bemerkung
Wenn sich die Definition einer gespeicherten Prozedur, die von einer Aufgabe aufgerufen wurde, während der Ausführung des Task-Graphen ändert, kann der geänderte Code der gespeicherten Prozedur bei Aufruf durch die Aufgabe bereits in der aktuellen Ausführung zur Anwendung kommen.
Angenommen, die Stammaufgabe im Task-Graphen wird angehalten, aber eine geplante Ausführung dieser Aufgabe hat bereits begonnen. Der Eigentümer aller Aufgaben im Task-Graphen ändert den von einer untergeordneten Aufgabe aufgerufenen SQL-Code, während sich die Stammaufgabe noch in Ausführung befindet. Die untergeordnete Aufgabe wird ausgeführt und führt den SQL-Code in ihrer Definition aus, wobei die Version des Task-Graphen verwendet wird, die aktuell war, als die Ausführung der Stammaufgabe begann. Wenn die Stammaufgabe fortgesetzt oder manuell ausgeführt wird, wird eine neue Version des Task-Graphen erstellt. Diese neue Version enthält die an der untergeordneten Aufgabe vorgenommenen Änderungen.
Um den Versionsverlauf der Aufgabenversionen abzurufen, fragen Sie die Account Usage-Ansicht TASK_VERSIONS (in der freigegebenen SNOWFLAKE-Datenbank) ab.
Dauer des Task-Graphen¶
Die Dauer des Task-Graphen umfasst die Zeit zwischen dem geplanten Start der Stammaufgabe und der Beendigung der letzten untergeordneten Aufgabe. Um die Dauer eines Task-Graphen zu berechnen, fragen Sie Ansicht COMPLETE_TASK_GRAPHS ab und vergleichen SCHEDULED_TIME mit COMPLETED_TIME.
Das folgende Diagramm zeigt zum Beispiel einen Task-Graphen, der jede Minute ausgeführt werden soll. Die Stammaufgabe und ihre beiden untergeordneten Aufgaben stehen jeweils 5 Sekunden in der Warteschlange und laufen 10 Sekunden lang, so dass sie insgesamt 45 Sekunden benötigen.
Timeouts im Task-Graphen¶
Wenn USER_TASK_TIMEOUT_MS in der Stammaufgabe festgelegt ist, gilt das Timeout für den gesamten Task-Graphen.
Wenn USER_TASK_TIMEOUT_MS in einer untergeordneten Aufgabe oder Finalizer-Aufgabe eingestellt ist, gilt das Timeout nur für diese Aufgabe.
Wenn USER_TASK_TIMEOUT_MS sowohl in der Stammaufgabe als auch in einer untergeordneten Aufgabe festgelegt ist, hat das Timeout der untergeordneten Aufgabe Vorrang vor dem Timeout der Stammaufgabe für diese untergeordnete Aufgabe.
Hinweise¶
Bei serverlosen Aufgaben skaliert Snowflake automatisch die Ressourcen, um sicherzustellen, dass die Aufgaben innerhalb eines Zielintervalls abgeschlossen werden, einschließlich der Zeit für die Warteschlangenbildung.
Bei vom Benutzer verwalteten Aufgaben sind längere Wartezeiten üblich, wenn die Aufgaben in einem gemeinsam genutzten oder stark frequentierten Warehouse ausgeführt werden sollen.
Bei Task-Graphen kann die Gesamtzeit zusätzliche Wartezeit für untergeordnete Aufgaben enthalten, die auf die Fertigstellung ihrer Vorgänger warten.
Erstellen Sie einen Task-Graphen mit Logik (Laufzeitinformationen, Konfiguration und Rückgabewerte)¶
Aufgaben in einem Task-Graphen können Rückgabewerte von übergeordneten Aufgaben verwenden, um logikbasierte Operationen in ihrem Funktionskörper durchzuführen.
Hinweise:
Bei einigen logikbasierten Befehlen, wie SYSTEM$GET_PREDECESSOR_RETURN_VALUE, wird zwischen Groß- und Kleinschreibung unterschieden. Aufgaben, die mit CREATE TASK ohne Anführungszeichen erstellt werden, werden jedoch in Großbuchstaben gespeichert und aufgelöst. Um dies zu bewerkstelligen, können Sie eine der folgenden Möglichkeiten nutzen:
Erstellen Sie Aufgabennamen nur mit Großbuchstaben.
Verwenden Sie Anführungszeichen, wenn Sie Aufgaben benennen und aufrufen.
Bei Aufgabennamen, die mit Kleinbuchstaben definiert sind, rufen Sie die Aufgabe mit Großbuchstaben auf. Ein Beispiel: Eine Aufgabe, die durch „CREATE TASK task_c…“ definiert ist, kann als SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE(‚TASK_C‘) aufgerufen werden.
Übergeben Sie Konfigurationsinformationen an den Task-Graphen¶
Sie können Konfigurationsinformationen über ein JSON-Objekt weitergeben, das von anderen Aufgaben in einem Task-Graphen gelesen werden kann. Verwenden Sie die Syntax CREATE/ALTER TASK. .. CONFIG, um die Konfigurationsinformationen in der Stammaufgabe zu setzen, aufzuheben oder zu ändern. Verwenden Sie die Funktion SYSTEM$GET_TASK_GRAPH_CONFIG, um sie abzurufen. Beispiel:
CREATE OR REPLACE TASK "task_root"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS SELECT 1;
CREATE OR REPLACE TASK "task_a"
USER_TASK_TIMEOUT_MS = 600000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('task c path',:value);
END;
Rückgabewerte zwischen Aufgaben weitergeben¶
Sie können Rückgabewerte zwischen Aufgaben in einem Task-Graphen übergeben Verwenden Sie die Funktion SYSTEM$SET_RETURN_VALUE, um einen Rückgabewert von einer Aufgabe hinzuzufügen, und verwenden Sie die Funktion SYSTEM$GET_PREDECESSOR_RETURN_VALUE, um ihn abzurufen.
Wenn eine Aufgabe mehrere Vorgänger hat, müssen Sie angeben, welche Aufgabe den von Ihnen gewünschten Rückgabewert hat. Im folgenden Beispiel erstellen wir eine Stammaufgabe in einem Task-Graphen, die Konfigurationsinformationen hinzufügt.
CREATE OR REPLACE TASK "task_c"
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
END;
CREATE OR REPLACE TASK "task_d"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_c"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
END;
Laufzeitinformationen abrufen und verwenden¶
Verwenden Sie die Funktion SYSTEM$TASK_RUNTIME_INFO, um Informationen über den aktuellen Aufgabenlauf zu melden. Diese Funktion verfügt über mehrere Optionen, die speziell für Task-Graphen gelten. Verwenden Sie zum Beispiel CURRENT_ROOT_TASK_NAME, um den Namen der Stammaufgabe im aktuellen Task-Graphen zu ermitteln. Die folgenden Beispiele zeigen, wie Sie einer Tabelle einen Datumsstempel hinzufügen, der darauf basiert, wann die Stammaufgabe des Task-Graphen gestartet wurde.
-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
USER_TASK_TIMEOUT_MS = 60000
AFTER "task_root"
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO date_time_table VALUES('order_date',:value);
END;
Beispiele¶
Beispiel: Mehrere Aufgaben starten und Status melden¶
Im folgenden Beispiel startet die Stammaufgabe Aufgaben zur Aktualisierung von drei verschiedenen Tabellen. Nachdem diese drei Tabellen aktualisiert wurden, kombiniert eine Aufgabe die Informationen aus den anderen drei Tabellen zu einer aggregierten Verkaufstabelle.
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
USER_TASK_TIMEOUT_MS = 60000
CONFIG='{"environment": "production", "path": "/prod_directory/"}'
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
END;
;
-- task_customer_table: Updates the customer table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT customer_id FROM ref_cust_table
WHERE cust_name = "Jane Doe";);
INSERT INTO customer_table VALUES('customer_id',:value);
END;
;
-- task_product_table: Updates the product table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT product_id FROM ref_item_table
WHERE PRODUCT_NAME = "widget";);
INSERT INTO product_table VALUES('product_id',:value);
END;
;
-- task_date_time_table: Updates the date/time table.
-- Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
INSERT INTO "date_time_table" VALUES('order_date',:value);
END;
;
-- task_sales_table: Aggregates changes from other tables.
-- Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
USER_TASK_TIMEOUT_MS = 60000
AFTER task_customer_table, task_product_table, task_date_time_table
AS
BEGIN
LET VALUE := (SELECT sales_order_id FROM ORDERS);
JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
INSERT INTO sales_table VALUES('sales_order_id',:value);
END;
;
Beispiel für eine Finalizer-Aufgabe: E-Mail-Benachrichtigung senden¶
Dieses Beispiel zeigt, wie eine Finalizer-Aufgabe eine E-Mail senden kann, in der zusammengefasst wird, wie der Task-Graph ausgeführt wurde. Die Aufgabe ruft zwei externe Funktionen auf: eine sammelt Informationen über den Status der Aufgabenerledigung und die andere verwendet die Informationen, um eine E-Mail zu verfassen, die über einen Remote-Messaging-Dienst versendet werden kann.
CREATE OR REPLACE TASK notify_finalizer
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_root
AS
DECLARE
my_root_task_id STRING;
my_start_time TIMESTAMP_LTZ;
summary_json STRING;
summary_html STRING;
BEGIN
--- Get root task ID
my_root_task_id := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
--- Get root task scheduled time
my_start_time := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP')::timestamp_ltz);
--- Combine all task run info into one JSON string
summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
--- Convert JSON into HTML table
summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));
--- Send HTML to email
CALL SYSTEM$SEND_EMAIL(
'email_notification',
'admin@snowflake.com',
'notification task run summary',
:summary_html,
'text/html');
--- Set return value for finalizer
CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
END
CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
RETURNS STRING
AS
$$
(SELECT
ARRAY_AGG(OBJECT_CONSTRUCT(
'task_name', name,
'run_status', state,
'return_value', return_value,
'started', query_start_time,
'duration', duration,
'error_message', error_message
)
) AS GRAPH_RUN_SUMMARY
FROM
(SELECT
NAME,
CASE
WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
WHEN STATE = 'FAILED' then '🔴 Failed'
WHEN STATE = 'SKIPPED' then '🔵 Skipped'
WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
END AS STATE,
RETURN_VALUE,
TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
' s') AS DURATION,
ERROR_MESSAGE
FROM
TABLE(my-database.information_schema.task_history(
ROOT_TASK_ID => my_root_task_id ::STRING,
SCHEDULED_TIME_RANGE_START => my_start_time,
SCHEDULED_TIME_RANGE_END => current_timestamp()
))
ORDER BY
SCHEDULED_TIME)
)::STRING
$$
;
CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
IMPORT JSON
def GENERATE_HTML_TABLE(JSON_DATA):
column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]
DATA = json.loads(JSON_DATA)
HTML = f"""
<img src="https://example.com/logo.jpg"
alt="Company logo" height="72">
<p><strong>Task Graph Run Summary</strong>
<br>Sign in to Snowsight to see more details.</p>
<table border="1" style="border-color:#DEE3EA"
cellpadding="5" cellspacing="0">
<thead>
<tr>
"""
headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
for i, header in enumerate(headers):
HTML += f'<th scope="col" style="text-align:left;
width: {column_widths[i]}">{header.capitalize()}</th>'
HTML +="""
</tr>
</thead>
<tbody>
"""
for ROW_DATA in DATA:
HTML += "<tr>"
for header in headers:
key = header.replace(" ", "_").upper()
CELL_DATA = ROW_DATA.get(key, "")
HTML += f'<td style="text-align:left;
width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
HTML += "</tr>"
HTML +="""
</tbody>
</table>
"""
return HTML
$$
;
Beispiel für eine Finalizer-Aufgabe: Fehler korrigieren¶
Dieses Beispiel zeigt, wie eine Finalisierungsaufgabe Fehler korrigieren kann.
Zu Demonstrationszwecken sind die Aufgaben so konzipiert, dass sie bei ihrem ersten Durchlauf fehlschlagen. Die Finalisierungsaufgaben korrigieren das Problem und starten die Aufgaben neu, die bei den folgenden Durchläufen erfolgreich sind:
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;
-- 1. Set the default configurations.
-- Creates a root task ("task_a"), and sets the default configurations
-- used throughout the task graph.
-- Configurations include:
-- * Each task runs after one minute, with a 60-second timeout.
-- * If a task fails, retry it twice. if it fails twice,
-- the entire task graph is considered as failed.
-- * If the task graph fails consecutively three times, suspend the task.
-- * Other environment values are set.
CREATE OR REPLACE TASK task_a
SCHEDULE = '1 MINUTE'
USER_TASK_TIMEOUT_MS = 60000
TASK_AUTO_RETRY_ATTEMPTS = 2
SUSPEND_TASK_AFTER_NUM_FAILURES = 3
AS
BEGIN
CALL SYSTEM$SET_RETURN_VALUE('task a successful');
END;
;
-- 2. Use a runtime reflection variable.
-- Creates a child task ("task_b").
-- By design, this example fails the first time it runs, because
-- it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
USER_TASK_TIMEOUT_MS = 60000
AFTER task_a
AS
BEGIN
LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
INSERT INTO demo_table VALUES('task b name',:VALUE);
END;
;
-- 3. Get a task graph configuration value.
-- Creates the child task ("task_c").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
USER_TASK_TIMEOUT_MS = 60000
AFTER task_b
AS
BEGIN
CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
INSERT INTO demo_table VALUES('task c path',:value);
END;
;
-- 4. Get a value from a predecessor.
-- Creates the child task ("task_d").
-- By design, this example fails the first time it runs, because
-- the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
USER_TASK_TIMEOUT_MS = 60000
AFTER task_c
AS
BEGIN
LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
END;
;
-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
-- After the finalizer completes, the task should automatically retry
-- (see task_a: task_auto_retry_attempts).
-- On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
USER_TASK_TIMEOUT_MS = 60000
FINALIZE = task_a
AS
BEGIN
CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
END;
;
-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
-- Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');
-- 7. Query the task history
SELECT
name, state, attempt_number, scheduled_from
FROM
TABLE(information_schema.task_history(task_name=> 'task_b'))
LIMIT 5;
;
-- 8. Suspend the task graph to stop incurring costs
-- Note: To stop the task graph, you only need to suspend the root task
-- (task_a). Child tasks don’t run unless the root task is run.
-- If any child tasks are running, they have a limited duration
-- and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;
-- 9. Check tasks during execution (optional)
-- Run this command to query the demo table during execution
-- to check which tasks have run.
SELECT * FROM demo_table;
-- 10. Demo reset (optional)
-- Run this command to remove the demo table.
-- This causes task_b to fail during its first run.
-- After the task graph retries, task_b will succeed.
DROP TABLE demo_table;