Getriggerte Aufgaben¶
Verwenden Sie getriggerte Aufgaben, um Aufgaben immer dann auszuführen, wenn es eine Änderung in einem Stream gibt. Damit entfällt die Notwendigkeit, eine Quelle häufig abzufragen, wenn die Verfügbarkeit neuer Daten unvorhersehbar ist. Außerdem wird die Latenzzeit verringert, da die Daten sofort verarbeitet werden.
Getriggerte Aufgaben verbrauchen keine Computeressourcen, bis das Ereignis getriggert wird.
Hinweise¶
Bei getriggerten Aufgaben werden die folgenden Elemente unterstützt:
Tabellen
Ansichten
Dynamische Tabellen
Apache Iceberg™-Tabellen (verwaltet und nicht verwaltet)
Datenfreigaben
Verzeichnistabellen. Eine Verzeichnistabelle muss aktualisiert werden, bevor eine getriggerte Aufgabe die Änderungen erkennen kann. Um Änderungen zu erkennen, können Sie eine der folgenden Aufgaben durchführen:
Stellen Sie die Verzeichnistabelle so ein, dass sie automatisch aktualisiert wird.
Aktualisieren Sie die Verzeichnistabelle manuell mit dem Befehl ALTER STAGE REFRESH.
Getriggerte Aufgaben werden bei den folgenden Elementen nicht unterstützt:
Hybridtabellen
Streams auf externen Tabellen
Damit Verbraucher Streams auf freigegebenen Tabellen oder sicheren Ansichten erstellen können, muss der Datenanbieter die Änderungsverfolgung für die Tabellen und Ansichten aktivieren, die für die Freigabe im Konto vorgesehen sind, das heißt, ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;
. Ohne aktivierte Änderungsverfolgung können Verbraucher keine Streams auf den freigegebenen Daten erstellen. Weitere Informationen dazu finden Sie unter Streams auf freigegebenen Objekten.
Eine getriggerte Aufgabe erstellen¶
Verwenden Sie CREATE TASK, und stellen Sie die folgenden Parameter ein:
Definieren Sie den Ziel-Stream mit der Klausel
WHEN
. (Fügen Sie den ParameterSCHEDULE
nicht ein.)Zusätzliche Anforderungen basierend auf Computeressourcen:
Um eine Aufgabe zu erstellen, die auf einem vom Benutzer verwalteten Warehouse läuft, fügen Sie den Parameter
WAREHOUSE
ein und definieren das Warehouse.Um eine serverlose Aufgabe zu erstellen, müssen Sie den Parameter
TARGET_COMPLETION_INTERVAL
angeben. Fügen Sie den ParameterWAREHOUSE
nicht ein. Snowflake schätzt die benötigten Ressourcen anhand des angestrebten Fertigstellungsintervalls und passt sich an, um die Aufgabe in dieser Zeit zu erledigen.
Im folgenden Beispiel wird eine serverlose getriggerte Aufgabe erstellt, die immer dann ausgeführt wird, wenn sich Daten in einem Stream ändern.
CREATE TASK my_triggered_task
TARGET_COMPLETION_INTERVAL='15 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Migrieren Sie eine bestehende Aufgabe von einer geplanten Aufgabe zu einer getriggerten Aufgabe¶
Setzen Sie die Aufgabe aus.
Verwenden Sie ALTER TASK, um die Aufgabe zu aktualisieren. Setzen Sie den Parameter
SCHEDULE
zurück, und fügen Sie dann dieWHEN
-Klausel zum Definieren des Ziel-Streams hinzu.Nehmen Sie die Aufgabe wieder auf.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Migrieren Sie eine bestehende, vom Benutzer verwaltete getriggerte Aufgabe zu einer serverlosen getriggerten Aufgabe¶
Setzen Sie die Aufgabe aus.
Verwenden Sie ALTER TASK, um die Aufgabe zu aktualisieren. Entfernen Sie den Parameter
WAREHOUSE
, und stellen Sie dann den ParameterTARGET_COMPLETION_INTERVAL
ein.Nehmen Sie die Aufgabe wieder auf.
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Weitere Informationen finden Sie unter serverlose Aufgaben.
Zulassen, dass eine getriggerte Aufgabe ausgeführt wird¶
Wenn Sie eine getriggerte Aufgabe erstellen, startet sie im ausgesetzten Zustand.
Um mit der Überwachung des Streams zu beginnen:
Setzen Sie die Aufgabe mit ALTER TASK. .. RESUME fort.
Die Aufgabe wird unter den folgenden Bedingungen ausgeführt:
Wenn Sie eine getriggerte Aufgabe zum ersten Mal fortsetzen, überprüft die Aufgabe den Stream auf Änderungen, nachdem die letzte Aufgabe ausgeführt wurde. Wenn es eine Änderung gibt, wird die Aufgabe ausgeführt. Andernfalls wird die Aufgabe übersprungen, ohne Computeressourcen zu verwenden.
Wenn eine Aufgabe ausgeführt wird und der Stream neue Daten enthält, wird die Aufgabe angehalten, bis die aktuelle Aufgabe abgeschlossen ist. Snowflake stellt sicher, dass jeweils nur eine Instanz einer Aufgabe ausgeführt wird.
Nachdem eine Aufgabe abgeschlossen ist, sucht Snowflake erneut nach Änderungen im Stream. Wenn es Änderungen gibt, wird die Aufgabe erneut ausgeführt, wenn nicht, wird die Aufgabe übersprungen.
Die Aufgabe wird ausgeführt, sobald neue Daten im Stream entdeckt werden.
Wenn die Stream-Daten in einer Verzeichnistabelle gehostet werden, können Sie Änderungen feststellen, indem Sie eine der folgenden Aufgaben durchführen:
Alle 12 Stunden führt die Aufgabe einen Gesundheitscheck durch, um zu verhindern, dass die Streams veralten. Wenn es keine Änderungen gibt, überspringt Snowflake die Aufgabe, ohne Computeressourcen zu verwenden. Bei Streams müssen die Aufgaben-Anweisungen die Daten im Stream verbrauchen, bevor die Datenaufbewahrung abläuft; andernfalls wird der Stream abgestanden. Weitere Informationen dazu finden Sie unter Vermeiden des Veraltens von Streams.
Standardmäßig werden getriggerte Aufgaben höchstens alle 30 Sekunden ausgeführt. Sie können den Parameter USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS so ändern, dass die Ausführung häufiger erfolgt, bis zu alle 10 Sekunden.
Wenn eine Aufgabe durch Streams on Views getriggert wird, werden alle Änderungen an Tabellen, auf die die Abfrage Streams in Ansichten verweist, auch die Aufgabe triggern, unabhängig von Verknüpfungen, Aggregationen oder Filtern in der Abfrage.
Überwachung getriggerter Aufgaben¶
In der Ausgabe
SHOW TASKS
undDESC TASK
zeigt die EigenschaftSCHEDULE
NULL
für getriggerte Aufgaben an.In der Ausgabe der „task_history“-Ansicht der Schemas „information_schema“ und „account_usage“ wird in der Spalte SCHEDULED_FROM der Wert TRIGGER angezeigt.
Beispiele¶
Beispiel 1: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einem der beiden Streams ändern.
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
Beispiel 2: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn Datenänderungen in zwei verschiedenen Datenströmen festgestellt werden. Da die Aufgabe die Bedingung AND verwendet, wird die Aufgabe übersprungen, wenn nur einer der beiden Streams neue Daten enthält.
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
Beispiel 3: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einer Verzeichnistabelle ändern. In diesem Beispiel wird ein Stream namens „my_directory_table_stream“ auf einer Verzeichnistabelle in einem Stagingbereich namens „my_test_stage“ gehostet.
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
Um die getriggerte Aufgabe zu validieren, werden dem Stagingbereich Daten hinzugefügt.
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
Die Verzeichnistabelle wird dann manuell aktualisiert, wodurch die Aufgabe getriggert wird.
ALTER STAGE my_test_stage REFRESH