Task Reactor¶
Bibliothek, die allgemeine Elemente und Features bereitstellt, die in allen Snowflake-Konnektoren verwendet werden.
Anforderungen¶
Der Task Reactor benötigt mindestens die folgenden SQL-Dateien, die während der Native App-Installation ausgeführt werden müssen:
task_reactor.sql
(siehe SQL-Referenz zum Task Reactor)
Übersicht¶
Task Reactor ist ein separates Modul, das einen Orchestrierungsmechanismus für Work-Blöcke bietet, die in einer Warteschlange mit einer begrenzten Menge von Aufgaben gespeichert sind. Die Warteschlange und der Dispatcher des Task Reactor basieren auf Snowflake-Streams mit Snowflake-Aufgaben und werden aufgrund der begrenzten Aktualisierungszeit jede Minute ausgelöst. Der Task Reactor ist nur aktiv, wenn sich Daten in der Eingabewarteschlange befinden, sodass das Warehouse einige Credits sparen kann.
Der Task Reactor besteht aus den drei Hauptkomponenten Warteschlange, Dispatcher und Worker:
Ihre Konnektoranwendung fügt QueueItems in die Warteschlange ein.
Jede Minute ruft der Dispatcher (eine Snowflake-Aufgabe) die wartenden QueueItems aus der Warteschlange ab und leitet sie an die Worker weiter.
Jede Minute arbeiten die Worker (Snowflake-Aufgaben) parallel an den zugewiesenen QueueItems.
Sobald die Konnektorkonfiguration abgeschlossen ist, ist die Task Reactor-Konfiguration auf drei Schritte beschränkt:
Erstellen aller Task Reactor-Komponenten
Initialisieren der Instanz
(optional) Ändern der Worker-Nummer
Erstellen aller Task Reactor-Komponenten¶
Um ein Instanzobjekt zu erstellen, muss der Benutzer zunächst die Implementierungen von worker
, selector
und optional expired selector
erstellen und sie dann mit der Prozedur TASK_REACTOR.CREATE_INSTANCE_OBJECTS integrieren.
Worker-Implementierung¶
Der Worker ist für die Ausführung einer vom Dispatcher zugewiesenen Aufgabe verantwortlich, z. B. für das Abrufen und Einlesen bestimmter Daten. Der einzige obligatorische Teil ist eine bestimmte Worker-Methode, die den Job auslöst. Diese Methode muss von der Snowpark-Prozedur aus aufrufbar sein, einen String-Wert zurückgeben und die folgenden Parameter enthalten:
session
– Ein Snowpark-Sitzungsobjektworker_id
– Nummer, eindeutige Worker-IDtask_reactor_schema
– Name des Schemas, in dem die Task Reactor-Objekte erstellt werden. Kann als Name der Task Reactor-Instanz verwendet werden.
Der Worker ist für die Ausführung der vom Dispatcher zugewiesenen Aufgabe verantwortlich, z. B. für das Abrufen und Einlesen bestimmter Daten. Wir empfehlen die Verwendung der (com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker
und com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion
) Java-Klassen oder für einfachere Aufgaben (com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker
und com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask
). Ihr Worker kann jedoch in jeder Programmiersprache erstellt werden, die für das Schreiben von gespeicherten Handler-Prozeduren unterstützt wird.
Beispiel für eine Java-Worker-Methode:
public static String executeWork(Session session, int workerId, String taskReactorSchema) {
FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
WorkerId workerIdentifier = new WorkerId(workerId);
Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
try {
IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
} catch (WorkerException e) {
// handle the exception...
throw new RuntimeException(e);
}
return "Worker procedure executed.";
}
Bei einer bereits erstellten Worker-Methode muss der Benutzer diese in CONNECTOR.WORKER_PROCEDURE
integrieren. Die Prozedur sollte ihre eigene Worker-Methode aufrufen. Sie muss in Ihrem Anwendungsschema erstellt werden, einen STRING-Wert zurückgeben und die folgenden Parameter enthalten:
worker_id
– Zahltask_reactor_schema
– Zeichenfolge
Beispiel für eine Prozedur, die die Java-Implementierung des Workers aufruft:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
RETURNS STRING
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
Die Telemetrie-Bibliothek wird benötigt, um Metriken zu sammeln, die in der Ereignistabelle protokolliert werden.
Selektor-Implementierung¶
Die Aufgabe des Selektors ist es, zu entscheiden, welche Aufgaben in der Warteschlange vom Task Reactor bearbeitet werden sollen. Ähnlich wie bei der Worker-Implementierung kann der Selektor in jeder von Snowpark unterstützten Sprache erstellt werden. Der Aufgaben-Selektor kann als Datenbankprozedur oder als Datenbankansicht implementiert werden. Der Selektor (Prozedur oder Ansicht) muss als Argument an die Prozedur TASK_REACTOR.CREATE_NEW_INSTANCE
übergeben werden.
Die Prozedur muss von einer Snowpark-Prozedur aus aufrufbar sein, einen String-Wert zurückgeben und die folgenden Parameter enthalten:
session
– Snowpark-SitzungqueueItems
– String[] (ein Array aus einzelnen JSON-Strings, die jeweils ein einzelnes QueueItem beschreiben)
Beispiel für eine Java-Selektor-Methode:
public static String selectWork(Session session, String[] queueItems) {
Variant[] sorted =
Arrays.stream(queueItems)
.map(Variant::new)
.filter(
queueItem ->
!queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
.sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
.toArray(Variant[]::new);
return new Variant(sorted).asJsonString();
}
Anstelle der Selektor-Methode können Sie auch eine Ansicht erstellen, die Aufgaben aus der vorhandenen Warteschlange filtert und sortiert. Der Dispatcher kann mithilfe einer Beispielabfrage neue Aufgaben aus der neu erstellten Ansicht abrufen:
CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Mit der bereits erstellten Selektor-Methode muss der Benutzer sie in CONNECTOR.WORK_SELECTOR
integrieren. Die Prozedur sollte Ihre obligatorische Work-Selector-Methode aufrufen. Sie muss in Ihrem Anwendungsschema erstellt werden, ein ARRAY zurückgeben und den folgenden Parameter enthalten:
work_items - array
Beispiel für eine Prozedur, die die Java-Implementierung des Work-Selektors aufruft:
CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
RETURNS ARRAY
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Implementierung des Abgelaufene-Selektors¶
Die Aufgabe des „Abgelaufene“-Selektors ist es, zu entscheiden, welche Objekte aus der Warteschlange des Task Reactor entfernt werden sollen. Es kann erforderlich sein, Elemente zu entfernen, weil der Selektor einige Elemente nie erreichen kann und diese Elemente für immer in der Warteschlange bleiben würden. Außerdem können einige Objekte, die sich in der Warteschlange befinden, schon lange vorher erstellt worden sein, sodass es keinen Sinn ergibt, sie weiter zu bearbeiten. Der Abgelaufene-Selektor kann als Datenbankansicht implementiert werden. Die Selektoransicht muss als Argument an die Prozedur TASK_REACTOR.CREATE_NEW_INSTANCE
übergeben werden. Wenn kein Bedarf besteht, Objekte aus der Warteschlange zu entfernen, kann die Standardimplementierung TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
verwendet werden.
Mit der folgenden Abfrage können Sie eine Ansicht mit Abgelaufene-Selektor erstellen, die die Artikel auswählt, die vor mehr als drei Tagen erstellt wurden:
CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
AS SELECT * FROM TASK_REACTOR.QUEUE q
WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Instanzobjekte integrieren¶
Mit TASK_REACTOR.CREATE_INSTANCE_OBJECTS kann der Benutzer alle Instanzen gemeinsam konfigurieren, bevor die erstellten Instanzen initialisiert werden. Die Prozedur kann nur einmal pro Schema ausgeführt werden, sodass zukünftige Aufrufe keine Änderungen bewirken. Wir empfehlen, den Initialisierungsaufruf in die Datei setup.sql
aufzunehmen, um zu verhindern, dass die Prozedur mehrfach ausgeführt oder gar nicht aufgerufen wird.
Erforderliche Parameter:
instance_schema_name VARCHAR
– Ein pro Instanz eindeutiges Schema, das die Datenbankobjekte speichert, mit denen die Instanz arbeitet.worker_procedure_name VARCHAR
– Name der Worker-Prozedur, die im TeilWorker Implementation
beschrieben wird.work_selector_type VARCHAR
– Werte, die angeben, ob neue Aufgaben eine Ansicht oder eine Prozedur verwenden sollen. Mögliche Werte: VIEW, PROCEDURE.work_selector_name VARCHAR
– Name der Selektor-Prozedur/Ansicht, die im TeilSelector Implementation
beschrieben wird.
Optionale Parameter:
expired_work_selector_name VARCHAR
– Name der Abgelaufene-Selektor-Ansicht, die im TeilExpired Selector Implementation
beschrieben ist. Wenn der Wert nicht angegeben wird, wirdTASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
als Standardimplementierung verwendet, die nichts zurückgibt.
Initialisieren der Instanz¶
Um alle Konfigurationen im Task Reactor zu initialisieren und auszuführen, muss der Benutzer INITIALIZE_INSTANCE
aufrufen. Die Prozedur benötigt die folgenden Parameter als Eingabe:
instance_schema_name
(erforderlich) – Name des Schemas, in dem die Datenbankobjekte gespeichert sind, mit denen die Instanz arbeitet.warehouse_name
(erforderlich) – Name des Warehouses, auf dem die Instanz ausgeführt wird.dt_should_be_started
(optional) – Standard:TRUE
. Dispatcher-Aufgabe sollte starten, wenn eine neue Instanz erstellt wird, oder nicht.dt_task_schedule
(optional) – Standard:1 MINUTE
. Häufigkeit der Ausführung der Dispatcher-Aufgabe.dt_allow_overlapping_execution
(optional) – Standard:FALSE
. Ermöglicht, dass die DAG-Ausführung parallel erfolgt.dt_user_task_timeout_ms
(optional) – Gibt das Zeitlimit für die einzelne Ausführung der Aufgabe an, bevor das Zeitlimit überschritten wird (in Millisekunden).
Bemerkung
Wenn die Worker-Prozedur länger dauert als das in der Worker-Task eingestellte Timeout (USER_TASK_TIMEOUT_MS), dann bricht die Prozedur mit einem Timeout-Fehler ab. Es ist wichtig, dass Sie Aufgaben so planen, dass das Timeout der Snowflake-Aufgabe nicht überschritten wird.
Nachdem Sie die Mindestanzahl an erforderlichen Parametern angegeben haben, wird Task Reactor
mit der bereitgestellten Konfiguration initialisiert und aktiviert Worker mit der Prozedur TASK_REACTOR.DISPATCHER
.
Einstellen der Anzahl der Worker¶
Die Anzahl der Worker kann manuell geändert werden, indem Sie die Prozedur TASK_REACTOR.SET_WORKERS_NUMBER mit den folgenden Parametern aufrufen:
WORKERS_NUMBER
– Neue Anzahl von Workern.TR_INSTANCE_SCHEMA_NAME
– Name des Instanzschemas
Kennzahlen¶
Task Reactor enthält einen Mechanismus für Metriken. Er basiert auf Snowflake-Ablaufverfolgungsereignissen. Die Metriken werden in der Ereignistabelle protokolliert. Die Ereignistabelle muss also aktiviert sein, damit die Metriken funktionieren.
Derzeit sind die folgenden Metriken eingeführt:
worker working time
(TASK_REACTOR_WORKER_WORKING_TIME
) – Zeigt die Zeit an, in der ein Worker tatsächlich Ressourcen verarbeitet hat. Der Timer startet, wenn eine Worker-Aufgabe beginnt, und endet, wenn die Worker-Aufgabe beendet ist.worker idle time
(TASK_REACTOR_WORKER_IDLE_TIME
) – Gegenteil vonworker working time
. Zeigt die Zeit an, in der ein Worker inaktiv war: entweder in Erwartung einer neuen Tätigkeit oder in Erwartung der nächsten geplanten Ausführung seiner Aufgabe. Der Timer startet, wenn ein Worker seine Aufgabe beendet, und endet, wenn die Worker-Aufgabe erneut startet.
Um alle protokollierten Metrik-Ereignisse anzuzeigen, können Sie die folgende Abfrage verwenden:
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
ORDER BY event.timestamp DESC;
Um nur einen Metrik-Typ auszuwählen, fügen Sie event.record:name = <Metrikname>
zur where
-Klausel der Abfrage hinzu.
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
ORDER BY event.timestamp DESC;