Getriggerte Aufgaben

Verwenden Sie getriggerte Aufgaben, um Aufgaben immer dann auszuführen, wenn es eine Änderung in einem Stream gibt. Damit entfällt die Notwendigkeit, eine Quelle häufig abzufragen, wenn die Verfügbarkeit neuer Daten unvorhersehbar ist. Außerdem wird die Latenzzeit verringert, da die Daten sofort verarbeitet werden.

Getriggerte Aufgaben verbrauchen keine Computeressourcen, bis das Ereignis getriggert wird.

Hinweise

Bei getriggerten Aufgaben werden die folgenden Elemente unterstützt:

  • Tabellen

  • Ansichten

  • Dynamische Tabellen

  • Apache Iceberg™-Tabellen (verwaltet und nicht verwaltet)

  • Datenfreigaben

  • Verzeichnistabellen. Eine Verzeichnistabelle muss aktualisiert werden, bevor eine getriggerte Aufgabe die Änderungen erkennen kann. Um Änderungen zu erkennen, können Sie eine der folgenden Aufgaben durchführen:

Getriggerte Aufgaben werden bei den folgenden Elementen nicht unterstützt:

  • Hybridtabellen

  • Streams auf externen Tabellen

Damit Verbraucher Streams auf freigegebenen Tabellen oder sicheren Ansichten erstellen können, muss der Datenanbieter die Änderungsverfolgung für die Tabellen und Ansichten aktivieren, die für die Freigabe im Konto vorgesehen sind, das heißt, ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;. Ohne aktivierte Änderungsverfolgung können Verbraucher keine Streams auf den freigegebenen Daten erstellen. Weitere Informationen dazu finden Sie unter Streams auf freigegebenen Objekten.

Eine getriggerte Aufgabe erstellen

Verwenden Sie CREATE TASK, und stellen Sie die folgenden Parameter ein:

  • Definieren Sie den Ziel-Stream mit der Klausel WHEN. (Fügen Sie den Parameter SCHEDULE nicht ein.)

  • Zusätzliche Anforderungen basierend auf Computeressourcen:

    • Um eine Aufgabe zu erstellen, die auf einem vom Benutzer verwalteten Warehouse läuft, fügen Sie den Parameter WAREHOUSE ein und definieren das Warehouse.

    • Um eine serverlose Aufgabe zu erstellen, müssen Sie den Parameter TARGET_COMPLETION_INTERVAL angeben. Fügen Sie den Parameter WAREHOUSE nicht ein. Snowflake schätzt die benötigten Ressourcen anhand des angestrebten Fertigstellungsintervalls und passt sich an, um die Aufgabe in dieser Zeit zu erledigen.

Im folgenden Beispiel wird eine serverlose getriggerte Aufgabe erstellt, die immer dann ausgeführt wird, wenn sich Daten in einem Stream ändern.

Diagramm einer serverlosen getriggerten Aufgabe
CREATE TASK my_triggered_task
  TARGET_COMPLETION_INTERVAL='15 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Migrieren Sie eine bestehende Aufgabe von einer geplanten Aufgabe zu einer getriggerten Aufgabe

  1. Setzen Sie die Aufgabe aus.

  2. Verwenden Sie ALTER TASK, um die Aufgabe zu aktualisieren. Setzen Sie den Parameter SCHEDULE zurück, und fügen Sie dann die WHEN-Klausel zum Definieren des Ziel-Streams hinzu.

  3. Nehmen Sie die Aufgabe wieder auf.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrieren Sie eine bestehende, vom Benutzer verwaltete getriggerte Aufgabe zu einer serverlosen getriggerten Aufgabe

  1. Setzen Sie die Aufgabe aus.

  2. Verwenden Sie ALTER TASK, um die Aufgabe zu aktualisieren. Entfernen Sie den Parameter WAREHOUSE, und stellen Sie dann den Parameter TARGET_COMPLETION_INTERVAL ein.

  3. Nehmen Sie die Aufgabe wieder auf.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Weitere Informationen finden Sie unter serverlose Aufgaben.

Zulassen, dass eine getriggerte Aufgabe ausgeführt wird

Wenn Sie eine getriggerte Aufgabe erstellen, startet sie im ausgesetzten Zustand.

Um mit der Überwachung des Streams zu beginnen:

Die Aufgabe wird unter den folgenden Bedingungen ausgeführt:

  • Wenn Sie eine getriggerte Aufgabe zum ersten Mal fortsetzen, überprüft die Aufgabe den Stream auf Änderungen, nachdem die letzte Aufgabe ausgeführt wurde. Wenn es eine Änderung gibt, wird die Aufgabe ausgeführt. Andernfalls wird die Aufgabe übersprungen, ohne Computeressourcen zu verwenden.

  • Wenn eine Aufgabe ausgeführt wird und der Stream neue Daten enthält, wird die Aufgabe angehalten, bis die aktuelle Aufgabe abgeschlossen ist. Snowflake stellt sicher, dass jeweils nur eine Instanz einer Aufgabe ausgeführt wird.

  • Nachdem eine Aufgabe abgeschlossen ist, sucht Snowflake erneut nach Änderungen im Stream. Wenn es Änderungen gibt, wird die Aufgabe erneut ausgeführt, wenn nicht, wird die Aufgabe übersprungen.

  • Die Aufgabe wird ausgeführt, sobald neue Daten im Stream entdeckt werden.

  • Wenn die Stream-Daten in einer Verzeichnistabelle gehostet werden, können Sie Änderungen feststellen, indem Sie eine der folgenden Aufgaben durchführen:

  • Alle 12 Stunden führt die Aufgabe einen Gesundheitscheck durch, um zu verhindern, dass die Streams veralten. Wenn es keine Änderungen gibt, überspringt Snowflake die Aufgabe, ohne Computeressourcen zu verwenden. Bei Streams müssen die Aufgaben-Anweisungen die Daten im Stream verbrauchen, bevor die Datenaufbewahrung abläuft; andernfalls wird der Stream abgestanden. Weitere Informationen dazu finden Sie unter Vermeiden des Veraltens von Streams.

  • Standardmäßig werden getriggerte Aufgaben höchstens alle 30 Sekunden ausgeführt. Sie können den Parameter USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS so ändern, dass die Ausführung häufiger erfolgt, bis zu alle 10 Sekunden.

  • Wenn eine Aufgabe durch Streams on Views getriggert wird, werden alle Änderungen an Tabellen, auf die die Abfrage Streams in Ansichten verweist, auch die Aufgabe triggern, unabhängig von Verknüpfungen, Aggregationen oder Filtern in der Abfrage.

Ein Diagramm zeigt, wie getriggerte Aufgaben neue Daten verwalten, sobald sie eintreffen, und außerdem alle 12 Stunden auf Änderungen prüfen.

Überwachung getriggerter Aufgaben

  • In der Ausgabe SHOW TASKS und DESC TASK zeigt die Eigenschaft SCHEDULE NULL für getriggerte Aufgaben an.

  • In der Ausgabe der „task_history“-Ansicht der Schemas „information_schema“ und „account_usage“ wird in der Spalte SCHEDULED_FROM der Wert TRIGGER angezeigt.

Beispiele

Beispiel 1: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einem der beiden Streams ändern.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Beispiel 2: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn Datenänderungen in zwei verschiedenen Datenströmen festgestellt werden. Da die Aufgabe die Bedingung AND verwendet, wird die Aufgabe übersprungen, wenn nur einer der beiden Streams neue Daten enthält.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Beispiel 3: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einer Verzeichnistabelle ändern. In diesem Beispiel wird ein Stream namens „my_directory_table_stream“ auf einer Verzeichnistabelle in einem Stagingbereich namens „my_test_stage“ gehostet.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Um die getriggerte Aufgabe zu validieren, werden dem Stagingbereich Daten hinzugefügt.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

Die Verzeichnistabelle wird dann manuell aktualisiert, wodurch die Aufgabe getriggert wird.

ALTER STAGE my_test_stage REFRESH
Copy