Task Reactor

Bibliothek, die allgemeine Elemente und Features bereitstellt, die in allen Snowflake-Konnektoren verwendet werden.

Anforderungen

Der Task Reactor benötigt mindestens die folgenden SQL-Dateien, die während der Native App-Installation ausgeführt werden müssen:

Übersicht

Task Reactor ist ein separates Modul, das einen Orchestrierungsmechanismus für Work-Blöcke bietet, die in einer Warteschlange mit einer begrenzten Menge von Aufgaben gespeichert sind. Die Warteschlange und der Dispatcher des Task Reactor basieren auf Snowflake-Streams mit Snowflake-Aufgaben und werden aufgrund der begrenzten Aktualisierungszeit jede Minute ausgelöst. Der Task Reactor ist nur aktiv, wenn sich Daten in der Eingabewarteschlange befinden, sodass das Warehouse einige Credits sparen kann.

Der Task Reactor besteht aus den drei Hauptkomponenten Warteschlange, Dispatcher und Worker:

  1. Ihre Konnektoranwendung fügt QueueItems in die Warteschlange ein.

  2. Jede Minute ruft der Dispatcher (eine Snowflake-Aufgabe) die wartenden QueueItems aus der Warteschlange ab und leitet sie an die Worker weiter.

  3. Jede Minute arbeiten die Worker (Snowflake-Aufgaben) parallel an den zugewiesenen QueueItems.

Sobald die Konnektorkonfiguration abgeschlossen ist, ist die Task Reactor-Konfiguration auf drei Schritte beschränkt:

  1. Erstellen aller Task Reactor-Komponenten

  2. Initialisieren der Instanz

  3. (optional) Ändern der Worker-Nummer

Erstellen aller Task Reactor-Komponenten

Um ein Instanzobjekt zu erstellen, muss der Benutzer zunächst die Implementierungen von worker, selector und optional expired selector erstellen und sie dann mit der Prozedur TASK_REACTOR.CREATE_INSTANCE_OBJECTS integrieren.

Worker-Implementierung

Der Worker ist für die Ausführung einer vom Dispatcher zugewiesenen Aufgabe verantwortlich, z. B. für das Abrufen und Einlesen bestimmter Daten. Der einzige obligatorische Teil ist eine bestimmte Worker-Methode, die den Job auslöst. Diese Methode muss von der Snowpark-Prozedur aus aufrufbar sein, einen String-Wert zurückgeben und die folgenden Parameter enthalten:

  • session – Ein Snowpark-Sitzungsobjekt

  • worker_id – Nummer, eindeutige Worker-ID

  • task_reactor_schema – Name des Schemas, in dem die Task Reactor-Objekte erstellt werden. Kann als Name der Task Reactor-Instanz verwendet werden.

Der Worker ist für die Ausführung der vom Dispatcher zugewiesenen Aufgabe verantwortlich, z. B. für das Abrufen und Einlesen bestimmter Daten. Wir empfehlen die Verwendung der (com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker und com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion) Java-Klassen oder für einfachere Aufgaben (com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker und com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask). Ihr Worker kann jedoch in jeder Programmiersprache erstellt werden, die für das Schreiben von gespeicherten Handler-Prozeduren unterstützt wird.

Beispiel für eine Java-Worker-Methode:

public static String executeWork(Session session, int workerId, String taskReactorSchema) {
  FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
  WorkerId workerIdentifier = new WorkerId(workerId);
  Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
  try {
    IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
  } catch (WorkerException e) {
    // handle the exception...
    throw new RuntimeException(e);
  }
  return "Worker procedure executed.";
}
Copy

Bei einer bereits erstellten Worker-Methode muss der Benutzer diese in CONNECTOR.WORKER_PROCEDURE integrieren. Die Prozedur sollte ihre eigene Worker-Methode aufrufen. Sie muss in Ihrem Anwendungsschema erstellt werden, einen STRING-Wert zurückgeben und die folgenden Parameter enthalten:

  • worker_id – Zahl

  • task_reactor_schema – Zeichenfolge

Beispiel für eine Prozedur, die die Java-Implementierung des Workers aufruft:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
Copy

Die Telemetrie-Bibliothek wird benötigt, um Metriken zu sammeln, die in der Ereignistabelle protokolliert werden.

Selektor-Implementierung

Die Aufgabe des Selektors ist es, zu entscheiden, welche Aufgaben in der Warteschlange vom Task Reactor bearbeitet werden sollen. Ähnlich wie bei der Worker-Implementierung kann der Selektor in jeder von Snowpark unterstützten Sprache erstellt werden. Der Aufgaben-Selektor kann als Datenbankprozedur oder als Datenbankansicht implementiert werden. Der Selektor (Prozedur oder Ansicht) muss als Argument an die Prozedur TASK_REACTOR.CREATE_NEW_INSTANCE übergeben werden.

Die Prozedur muss von einer Snowpark-Prozedur aus aufrufbar sein, einen String-Wert zurückgeben und die folgenden Parameter enthalten:

  • session – Snowpark-Sitzung

  • queueItems – String[] (ein Array aus einzelnen JSON-Strings, die jeweils ein einzelnes QueueItem beschreiben)

Beispiel für eine Java-Selektor-Methode:

public static String selectWork(Session session, String[] queueItems) {
  Variant[] sorted =
      Arrays.stream(queueItems)
          .map(Variant::new)
          .filter(
              queueItem ->
                  !queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
          .sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
          .toArray(Variant[]::new);
  return new Variant(sorted).asJsonString();
}
Copy

Anstelle der Selektor-Methode können Sie auch eine Ansicht erstellen, die Aufgaben aus der vorhandenen Warteschlange filtert und sortiert. Der Dispatcher kann mithilfe einer Beispielabfrage neue Aufgaben aus der neu erstellten Ansicht abrufen:

CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Copy

Mit der bereits erstellten Selektor-Methode muss der Benutzer sie in CONNECTOR.WORK_SELECTOR integrieren. Die Prozedur sollte Ihre obligatorische Work-Selector-Methode aufrufen. Sie muss in Ihrem Anwendungsschema erstellt werden, ein ARRAY zurückgeben und den folgenden Parameter enthalten:

  • work_items - array

Beispiel für eine Prozedur, die die Java-Implementierung des Work-Selektors aufruft:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
    RETURNS ARRAY
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Copy

Implementierung des Abgelaufene-Selektors

Die Aufgabe des „Abgelaufene“-Selektors ist es, zu entscheiden, welche Objekte aus der Warteschlange des Task Reactor entfernt werden sollen. Es kann erforderlich sein, Elemente zu entfernen, weil der Selektor einige Elemente nie erreichen kann und diese Elemente für immer in der Warteschlange bleiben würden. Außerdem können einige Objekte, die sich in der Warteschlange befinden, schon lange vorher erstellt worden sein, sodass es keinen Sinn ergibt, sie weiter zu bearbeiten. Der Abgelaufene-Selektor kann als Datenbankansicht implementiert werden. Die Selektoransicht muss als Argument an die Prozedur TASK_REACTOR.CREATE_NEW_INSTANCE übergeben werden. Wenn kein Bedarf besteht, Objekte aus der Warteschlange zu entfernen, kann die Standardimplementierung TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR verwendet werden.

Mit der folgenden Abfrage können Sie eine Ansicht mit Abgelaufene-Selektor erstellen, die die Artikel auswählt, die vor mehr als drei Tagen erstellt wurden:

CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
    AS SELECT * FROM TASK_REACTOR.QUEUE q
        WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Copy

Instanzobjekte integrieren

Mit TASK_REACTOR.CREATE_INSTANCE_OBJECTS kann der Benutzer alle Instanzen gemeinsam konfigurieren, bevor die erstellten Instanzen initialisiert werden. Die Prozedur kann nur einmal pro Schema ausgeführt werden, sodass zukünftige Aufrufe keine Änderungen bewirken. Wir empfehlen, den Initialisierungsaufruf in die Datei setup.sql aufzunehmen, um zu verhindern, dass die Prozedur mehrfach ausgeführt oder gar nicht aufgerufen wird.

Erforderliche Parameter:

  • instance_schema_name VARCHAR – Ein pro Instanz eindeutiges Schema, das die Datenbankobjekte speichert, mit denen die Instanz arbeitet.

  • worker_procedure_name VARCHAR – Name der Worker-Prozedur, die im Teil Worker Implementation beschrieben wird.

  • work_selector_type VARCHAR – Werte, die angeben, ob neue Aufgaben eine Ansicht oder eine Prozedur verwenden sollen. Mögliche Werte: VIEW, PROCEDURE.

  • work_selector_name VARCHAR – Name der Selektor-Prozedur/Ansicht, die im Teil Selector Implementation beschrieben wird.

Optionale Parameter:

  • expired_work_selector_name VARCHAR – Name der Abgelaufene-Selektor-Ansicht, die im Teil Expired Selector Implementation beschrieben ist. Wenn der Wert nicht angegeben wird, wird TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR als Standardimplementierung verwendet, die nichts zurückgibt.

Initialisieren der Instanz

Um alle Konfigurationen im Task Reactor zu initialisieren und auszuführen, muss der Benutzer INITIALIZE_INSTANCE aufrufen. Die Prozedur benötigt die folgenden Parameter als Eingabe:

  • instance_schema_name (erforderlich) – Name des Schemas, in dem die Datenbankobjekte gespeichert sind, mit denen die Instanz arbeitet.

  • warehouse_name (erforderlich) – Name des Warehouses, auf dem die Instanz ausgeführt wird.

  • dt_should_be_started (optional) – Standard: TRUE. Dispatcher-Aufgabe sollte starten, wenn eine neue Instanz erstellt wird, oder nicht.

  • dt_task_schedule (optional) – Standard: 1 MINUTE. Häufigkeit der Ausführung der Dispatcher-Aufgabe.

  • dt_allow_overlapping_execution (optional) – Standard: FALSE. Ermöglicht, dass die DAG-Ausführung parallel erfolgt.

  • dt_user_task_timeout_ms (optional) – Gibt das Zeitlimit für die einzelne Ausführung der Aufgabe an, bevor das Zeitlimit überschritten wird (in Millisekunden).

Bemerkung

Wenn die Worker-Prozedur länger dauert als das in der Worker-Task eingestellte Timeout (USER_TASK_TIMEOUT_MS), dann bricht die Prozedur mit einem Timeout-Fehler ab. Es ist wichtig, dass Sie Aufgaben so planen, dass das Timeout der Snowflake-Aufgabe nicht überschritten wird.

Nachdem Sie die Mindestanzahl an erforderlichen Parametern angegeben haben, wird Task Reactor mit der bereitgestellten Konfiguration initialisiert und aktiviert Worker mit der Prozedur TASK_REACTOR.DISPATCHER.

Einstellen der Anzahl der Worker

Die Anzahl der Worker kann manuell geändert werden, indem Sie die Prozedur TASK_REACTOR.SET_WORKERS_NUMBER mit den folgenden Parametern aufrufen:

  • WORKERS_NUMBER – Neue Anzahl von Workern.

  • TR_INSTANCE_SCHEMA_NAME – Name des Instanzschemas

Kennzahlen

Task Reactor enthält einen Mechanismus für Metriken. Er basiert auf Snowflake-Ablaufverfolgungsereignissen. Die Metriken werden in der Ereignistabelle protokolliert. Die Ereignistabelle muss also aktiviert sein, damit die Metriken funktionieren.

Derzeit sind die folgenden Metriken eingeführt:

  • worker working time (TASK_REACTOR_WORKER_WORKING_TIME) – Zeigt die Zeit an, in der ein Worker tatsächlich Ressourcen verarbeitet hat. Der Timer startet, wenn eine Worker-Aufgabe beginnt, und endet, wenn die Worker-Aufgabe beendet ist.

  • worker idle time (TASK_REACTOR_WORKER_IDLE_TIME) – Gegenteil von worker working time. Zeigt die Zeit an, in der ein Worker inaktiv war: entweder in Erwartung einer neuen Tätigkeit oder in Erwartung der nächsten geplanten Ausführung seiner Aufgabe. Der Timer startet, wenn ein Worker seine Aufgabe beendet, und endet, wenn die Worker-Aufgabe erneut startet.

Um alle protokollierten Metrik-Ereignisse anzuzeigen, können Sie die folgende Abfrage verwenden:

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
    ORDER BY event.timestamp DESC;
Copy

Um nur einen Metrik-Typ auszuwählen, fügen Sie event.record:name = <Metrikname> zur where-Klausel der Abfrage hinzu.

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
        AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
    ORDER BY event.timestamp DESC;
Copy