Référence SQL de réacteur de tâches

Objets et procédures de la base de données

Les objets de base de données suivants sont créés par le biais du fichier task_reactor.sql.

TASK_REACTOR SCHEMA

Schéma versionné contenant un objet de la base de données de Task Reactor dans le connecteur.

TASK_REACTOR_INSTANCES SCHEMA

Schéma non versionné contenant un objet de base de données d’instance de Task Reactor dans le connecteur.

TASK_REACTOR_INSTANCES.INSTANCE_REGISTRY

Cette table est créée pour stocker les données relatives aux instances de Task Reactor afin de permettre le suivi et la gestion des instances existantes pendant l’exécution de l’application. La table est créée dans le schéma TASK_REACTOR_INSTANCES.

  • instance_name VARCHAR

  • is_initialized BOOLEAN

  • is_active BOOLEAN

TASK_REACTOR.DISPATCHER(INSTANCE_SCHEMA_NAME VARCHAR)

Cette procédure appelle le DispatcherHandler.dispatchWorkItems Java et permet de distribuer les éléments de travail.

TASK_REACTOR.SET_WORKERS_NUMBER (WORKERS_NUMBER NUMBER, INSTANCE_SCHEMA_NAME VARCHAR)

Cette procédure invoque le SetWorkersNumberHandler.setWorkersNumber Java et permet de définir le nombre de tâches worker.

TASK_REACTOR.CREATE_INSTANCE_OBJECTS

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

La procédure crée tous les objets d’instance requis pour un flux précis de Task reactor et valide ceux qui ne devraient pas être déjà initialisés. À la fin du processus, un nouvel enregistrement du registre des instances est inséré dans la table.

Les erreurs possibles incluent les suivantes :

  • INSTANCE_NOT_FOUND - L’instance portant ce nom n’existe pas.

  • INSTANCE_ALREADY_INITIALIZED - L’instance portant ce nom est déjà initialisée.

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION - Procédure stockée non trouvée.

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS - Un schéma portant le même nom existe déjà.

  • CREATING_TR_INSTANCE_EXCEPTION - Un problème inattendu s’est produit lors de la création d’une nouvelle instance de Task Reactor. Aucune instance n’a été créée.

TASK_REACTOR.INITIALIZE_INSTANCE

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WAREHOUSE_NAME VARCHAR

  • DT_SHOULD_BE_STARTED BOOLEAN

  • DT_TASK_SCHEDULE VARCHAR

  • DT_ALLOW_OVERLAPPING_EXECUTION BOOLEAN

  • DT_USER_TASK_TIMEOUT_MS VARCHAR

La procédure démarre toutes les instances non initialisées au sein de la même instance de base de données. Elle consiste à vérifier l’existence de l’instance, ou si elle n’est pas déjà initialisée, puis à créer des tâches de répartition et à lancer cette tâche si nécessaire.

La procédure se termine avec succès ainsi :

{
    "response_code": "OK",
    "message": "Instance has been initialized successfully."
}
Copy

Les erreurs possibles incluent les suivantes :

  • INSTANCE_NOT_FOUND - L’instance n’existe pas.

  • INSTANCE_ALREADY_INITIALIZED - L’instance portant ce nom est déjà initialisée.

TASK_REACTOR.PAUSE_INSTANCE

Paramètres d’entrée :

  • INSTANCE_SCHEMA VARCHAR

La procédure lance le processus de mise en pause d’une instance donnée de Task Reactor et renvoie la réponse OK. Elle lance une tâche qui arrête de manière asynchrone toutes les tâches worker et la tâche du répartiteur. Dans le cas où une tâche worker était déjà en train d’effectuer une ingestion, la tâche n’est pas arrêtée tout de suite, mais elle le sera une fois l’ingestion terminée.

Note

La logique de cette procédure est déjà utilisée dans Connecteur de pause, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de l’arrêt de l’ensemble du connecteur.

La procédure se termine avec succès ainsi :

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.RESUME_INSTANCE

Paramètres d’entrée :

  • INSTANCE_SCHEMA VARCHAR

La procédure lance le processus de reprise d’une instance donnée de Task Reactor et renvoie la réponse OK. Elle reprend la tâche du répartiteur et lance un travail qui reprend de manière asynchrone toutes les tâches workers qui ont déjà du travail attribué.

Note

La logique de cette procédure est déjà utilisée dans la reprise du connecteur, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de la reprise de l’ensemble du connecteur.

La procédure se termine avec succès ainsi :

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.UPDATE_WAREHOUSE_INSTANCE

Paramètres d’entrée :

  • WAREHOUSE_NAME VARCHAR

  • INSTANCE_SCHEMA VARCHAR

La procédure lance le processus de modification de l’entrepôt pour une instance donnée de Task Reactor. Elle modifie l’entrepôt de la tâche du répartiteur, puis lance un travail qui modifie de manière asynchrone l’entrepôt de toutes les tâches workers.

Note

La logique de cette procédure est déjà utilisée dans Mise à jour de l’entrepôt, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de la mise à jour de l’entrepôt pour l’ensemble du connecteur.

La procédure se termine avec succès ainsi :

{
    "response_code": "OK"
}
Copy

Les erreurs possibles incluent les suivantes :

  • INSTANCE_NOT_FOUND - L’instance donnée n’existe pas.

  • TASK_REACTOR_INSTANCE_IS_ACTIVE - L’instance de Task Reactor n’a pas été mise en pause avant d’utiliser cette procédure.

Procédures internes

Toutes les procédures ci-dessous sont utilisées uniquement en interne dans le script d’installation de task_reactor et ne doivent pas être utilisées en externe.

TASK_REACTOR.CREATE_INSTANCE_SCHEMA (INSTANCE_SCHEMA_NAME VARCHAR)

Cette procédure crée un nouveau schéma avec l’identificateur nommé instance_schema_name, puis lève une nouvelle exception si le schéma n’a pas pu être créé.

Les erreurs possibles incluent les suivantes :

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS - Un schéma portant le même nom existe déjà.

TASK_REACTOR.VALIDATE_PROCEDURE_EXISTENCE

Paramètres d’entrée :

  • PROCEDURE_NAME VARCHAR

  • PROCEDURE_TYPE VARCHAR

Cette procédure vérifie si les procédures définies n’existent pas et lève une nouvelle exception.

Les erreurs possibles incluent les suivantes :

  • WORKER_PROCEDURE_NOT_FOUND_EXCEPTION - La procédure pour les tâches worker n’a pas été trouvée.

  • WORK_SELECTOR_PROCEDURE_NOT_FOUND_EXCEPTION - La procédure de sélection des travaux n’a pas été trouvée.

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION - Procédure stockée non trouvée.

TASK_REACTOR.CREATE_QUEUE

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • STREAM_NAME VARCHAR

La méthode d’aide pour Task Reactor, elle propose de créer une table de file d’attente avec le nom instance_schema_name.table_name et les colonnes suivantes :

  • ID STRING

  • RESOURCE_ID STRING

  • DISPATCHER_OPTIONS VARIANT

  • WORKER_PAYLOAD VARIANT

  • TIMESTAMP DATETIME

Elle crée ensuite un flux portant le nom instance_schema_name.stream_name s’il n’existe pas encore.

TASK_REACTOR.CREATE_WORKER_REGISTRY_SEQUENCE

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

La méthode d’aide pour Task Reactor, qui propose la création d’une séquence pour le registre de la tâche worker avec le nom de séquence instance_schema_name.sequence_name.

TASK_REACTOR.CREATE_WORKER_REGISTRY

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

La méthode d’aide pour Task Reactor, qui permet de créer des registres de tâches worker, se compose d’une table portant le nom instance_schema_name.table_name et les colonnes :

  • WORKER_ID NUMBER avec la séquence instance_schema_name.sequence_name par défaut

  • CREATED_AT DATETIME

  • UPDATED_AT DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_WORKER_STATUS_TABLE

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

Une méthode d’aide pour Task Reactor, qui permet de créer une table de statut pour une tâche worker portant le nom instance_schema_name.table_name et avec les colonnes :

  • WORKER_ID NUMBER

  • TIMESTAMP DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_CONFIG_TABLE

Paramètres d’entrée :

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

  • IS_INSTANCE_REGISTERED BOOLEAN

La méthode d’aide pour Task Reactor, propose de créer une table de configuration nommée instance_schema_name.table_name avec des colonnes de clés et de valeurs. Elle insère ensuite les données de configuration dans la table si elles ne sont pas déjà enregistrées avec les valeurs suivantes :

  • WORKER_PROCEDURE

  • WORK_SELECTOR_TYPE

  • WORK_SELECTOR

  • SCHEMA