Getriggerte Aufgaben

Verwenden Sie getriggerte Aufgaben, um Aufgaben immer dann auszuführen, wenn es eine Änderung in einem Stream gibt. Damit entfällt die Notwendigkeit, eine Quelle häufig abzufragen, wenn die Verfügbarkeit neuer Daten unvorhersehbar ist. Außerdem wird die Latenzzeit verringert, da die Daten sofort verarbeitet werden.

Getriggerte Aufgaben verbrauchen keine Computeressourcen, bis das Ereignis getriggert wird.

Hinweise

  • Bei Streams, die auf Verzeichnistabellen gehostet werden, muss die Verzeichnistabelle aktualisiert werden, bevor eine getriggerte Aufgabe die Änderungen erkennen kann. Um Änderungen zu erkennen, können Sie eine der folgenden Möglichkeiten nutzen:

  • Streams auf externen Tabellen und Hybridtabellen werden nicht unterstützt.

Eine getriggerte Aufgabe erstellen

Verwenden Sie CREATE TASK, und stellen Sie die folgenden Parameter ein:

  • Definieren Sie den Ziel-Stream mit der Klausel WHEN. (Fügen Sie den Parameter SCHEDULE nicht ein.)

    Wenn Sie mit mehreren Datenströmen arbeiten, können Sie bedingte Parameter verwenden: WHEN ... AND und WHEN ... OR.

  • Zusätzliche Anforderungen basierend auf Computeressourcen:

    • Um eine serverlose Aufgabe zu erstellen, müssen Sie den Parameter TARGET_COMPLETION_INTERVAL angeben. Fügen Sie den Parameter WAREHOUSE nicht ein. Snowflake schätzt die benötigten Ressourcen anhand des angestrebten Fertigstellungsintervalls und passt sich an, um die Aufgabe in dieser Zeit zu erledigen.

    Diagramm, das zeigt, wie serverlose getriggerte Aufgaben in Snowflake funktionieren.
    • Um eine Aufgabe zu erstellen, die auf einem vom Benutzer verwalteten Warehouse läuft, fügen Sie den Parameter WAREHOUSE ein und definieren das Warehouse.

Migrieren Sie eine bestehende Aufgabe von einer geplanten Aufgabe zu einer getriggerten Aufgabe

  1. Setzen Sie die Aufgabe aus.

  2. Deaktivieren Sie den Parameter SCHEDULE und fügen Sie die Klausel WHEN hinzu, um den Ziel-Stream zu definieren.

  3. Nehmen Sie die Aufgabe wieder auf.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

Migrieren Sie eine bestehende, vom Benutzer verwaltete getriggerte Aufgabe zu einer serverlosen getriggerten Aufgabe

  1. Setzen Sie die Aufgabe aus.

  2. Entfernen Sie den Parameter WAREHOUSE, und setzen Sie den Parameter TARGET_COMPLETION_INTERVAL.

  3. Nehmen Sie die Aufgabe wieder auf.

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

Weitere Informationen finden Sie unter serverlose Aufgaben.

Ausführung der getriggerten Aufgabe erlauben

Wenn Sie eine getriggerte Aufgabe erstellen, startet sie im ausgesetzten Zustand.

Um mit der Überwachung des Streams zu beginnen:

Die Aufgabe wird unter den folgenden Bedingungen ausgeführt:

  • Wenn Sie eine getriggerte Aufgabe zum ersten Mal wieder aufnehmen, prüft die Aufgabe den Stream auf Änderungen seit der letzten Ausführung der Aufgabe. Wenn es eine Änderung gibt, wird die Aufgabe ausgeführt, andernfalls wird die Aufgabe übersprungen, ohne dass Computeressourcen verwendet werden.

  • Wenn eine Aufgabe läuft und der Stream neue Daten enthält, wartet die Aufgabe, bis die aktuelle Aufgabe abgeschlossen ist. Snowflake stellt sicher, dass jeweils nur eine Instanz einer Aufgabe ausgeführt wird.

  • Nachdem eine Aufgabe abgeschlossen ist, sucht Snowflake erneut nach Änderungen im Stream. Wenn es Änderungen gibt, wird die Aufgabe erneut ausgeführt, wenn nicht, wird die Aufgabe übersprungen.

  • Die Aufgabe wird ausgeführt, sobald neue Daten im Stream entdeckt werden.

  • Wenn sich die Stream-Daten in einer Verzeichnistabelle befinden, können Sie zur Erkennung von Änderungen eine der folgenden Aktionen durchführen:

  • Alle 12 Stunden führt die Aufgabe einen Gesundheitscheck durch, um zu verhindern, dass die Streams veralten. Wenn es keine Änderungen gibt, überspringt Snowflake die Aufgabe, ohne Computeressourcen zu verwenden. Bei Streams müssen die Aufgaben-Anweisungen die Daten im Stream verbrauchen, bevor die Datenaufbewahrung abläuft; andernfalls wird der Stream abgestanden. Weitere Informationen dazu finden Sie unter Vermeiden des Veraltens von Streams.

  • Standardmäßig werden getriggerte Aufgaben höchstens alle 30 Sekunden ausgeführt. Sie können den Parameter USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS so ändern, dass die Ausführung häufiger erfolgt, bis zu alle 10 Sekunden.

  • Wenn eine Aufgabe durch Streams on Views getriggert wird, werden alle Änderungen an Tabellen, auf die die Abfrage Streams in Ansichten verweist, auch die Aufgabe triggern, unabhängig von Verknüpfungen, Aggregationen oder Filtern in der Abfrage.

Ein Diagramm zeigt, wie getriggerte Aufgaben neue Daten verwalten, sobald sie eintreffen, und außerdem alle 12 Stunden auf Änderungen prüfen.

Überwachung getriggerter Aufgaben

  • In der Ausgabe SHOW TASKS und DESC TASK zeigt die Eigenschaft SCHEDULE NULL für getriggerte Aufgaben an.

  • In der Ausgabe der „task_history“-Ansicht der Schemas „information_schema“ und „account_usage“ wird in der Spalte SCHEDULED_FROM der Wert TRIGGER angezeigt.

Beispiele

Beispiel 1: Erstellen Sie eine serverlose Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einem Stream ändern.

Da es sich um eine serverlose Aufgabe handelt, ist der Parameter TARGET_COMPLETION_INTERVAL erforderlich, damit Snowflake die benötigten Computeressourcen schätzen kann.

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

Beispiel 2: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einem der beiden Streams ändern.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Beispiel 3: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn Datenänderungen in zwei verschiedenen Datenströmen erkannt werden. Da die Aufgabe die Bedingung AND verwendet, wird die Aufgabe übersprungen, wenn nur einer der beiden Streams neue Daten enthält.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Beispiel 4: Erstellen Sie eine vom Benutzer verwaltete Aufgabe, die immer dann ausgeführt wird, wenn sich Daten in einer Verzeichnistabelle ändern. In diesem Beispiel wird ein Stream (my_directory_table_stream) auf einer Verzeichnistabelle in einem Stagingbereich (my_test_stage) gehostet.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

Um die getriggerte Aufgabe zu validieren, werden dem Stagingbereich Daten hinzugefügt.

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

Die Verzeichnistabelle wird dann manuell aktualisiert, wodurch die Aufgabe getriggert wird.

ALTER STAGE my_test_stage REFRESH
Copy