Référence SQL de réacteur de tâches¶
Objets et procédures de la base de données¶
Les objets de base de données suivants sont créés par le biais du fichier task_reactor.sql
.
TASK_REACTOR SCHEMA¶
Schéma versionné contenant un objet de la base de données de Task Reactor dans le connecteur.
TASK_REACTOR_INSTANCES SCHEMA¶
Schéma non versionné contenant un objet de base de données d’instance de Task Reactor dans le connecteur.
TASK_REACTOR_INSTANCES.INSTANCE_REGISTRY¶
Cette table est créée pour stocker les données relatives aux instances de Task Reactor afin de permettre le suivi et la gestion des instances existantes pendant l’exécution de l’application. La table est créée dans le schéma TASK_REACTOR_INSTANCES
.
instance_name
VARCHAR
is_initialized
BOOLEAN
is_active
BOOLEAN
TASK_REACTOR.DISPATCHER(INSTANCE_SCHEMA_NAME VARCHAR)¶
Cette procédure appelle le DispatcherHandler.dispatchWorkItems
Java et permet de distribuer les éléments de travail.
TASK_REACTOR.SET_WORKERS_NUMBER (WORKERS_NUMBER NUMBER, INSTANCE_SCHEMA_NAME VARCHAR)¶
Cette procédure invoque le SetWorkersNumberHandler.setWorkersNumber
Java et permet de définir le nombre de tâches worker.
TASK_REACTOR.CREATE_INSTANCE_OBJECTS¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
WORKER_PROCEDURE_NAME
VARCHAR
WORK_SELECTOR_TYPE
VARCHAR
WORK_SELECTOR_NAME
VARCHAR
EXPIRED_WORK_SELECTOR_NAME
VARCHAR
La procédure crée tous les objets d’instance requis pour un flux précis de Task reactor
et valide ceux qui ne devraient pas être déjà initialisés. À la fin du processus, un nouvel enregistrement du registre des instances est inséré dans la table.
Les erreurs possibles incluent les suivantes :
INSTANCE_NOT_FOUND
- L’instance portant ce nom n’existe pas.INSTANCE_ALREADY_INITIALIZED
- L’instance portant ce nom est déjà initialisée.DEFAULT_PROCEDURE_VALIDATION_EXCEPTION
- Procédure stockée non trouvée.SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS
- Un schéma portant le même nom existe déjà.CREATING_TR_INSTANCE_EXCEPTION
- Un problème inattendu s’est produit lors de la création d’une nouvelle instance de Task Reactor. Aucune instance n’a été créée.
TASK_REACTOR.INITIALIZE_INSTANCE¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
WAREHOUSE_NAME
VARCHAR
DT_SHOULD_BE_STARTED
BOOLEAN
DT_TASK_SCHEDULE
VARCHAR
DT_ALLOW_OVERLAPPING_EXECUTION
BOOLEAN
DT_USER_TASK_TIMEOUT_MS
VARCHAR
La procédure démarre toutes les instances non initialisées au sein de la même instance de base de données. Elle consiste à vérifier l’existence de l’instance, ou si elle n’est pas déjà initialisée, puis à créer des tâches de répartition et à lancer cette tâche si nécessaire.
La procédure se termine avec succès ainsi :
{
"response_code": "OK",
"message": "Instance has been initialized successfully."
}
Les erreurs possibles incluent les suivantes :
INSTANCE_NOT_FOUND
- L’instance n’existe pas.INSTANCE_ALREADY_INITIALIZED
- L’instance portant ce nom est déjà initialisée.
TASK_REACTOR.PAUSE_INSTANCE¶
Paramètres d’entrée :
INSTANCE_SCHEMA
VARCHAR
La procédure lance le processus de mise en pause d’une instance donnée de Task Reactor et renvoie la réponse OK. Elle lance une tâche qui arrête de manière asynchrone toutes les tâches worker et la tâche du répartiteur. Dans le cas où une tâche worker était déjà en train d’effectuer une ingestion, la tâche n’est pas arrêtée tout de suite, mais elle le sera une fois l’ingestion terminée.
Note
La logique de cette procédure est déjà utilisée dans Connecteur de pause, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de l’arrêt de l’ensemble du connecteur.
La procédure se termine avec succès ainsi :
{
"response_code": "OK"
}
TASK_REACTOR.RESUME_INSTANCE¶
Paramètres d’entrée :
INSTANCE_SCHEMA
VARCHAR
La procédure lance le processus de reprise d’une instance donnée de Task Reactor et renvoie la réponse OK. Elle reprend la tâche du répartiteur et lance un travail qui reprend de manière asynchrone toutes les tâches workers qui ont déjà du travail attribué.
Note
La logique de cette procédure est déjà utilisée dans la reprise du connecteur, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de la reprise de l’ensemble du connecteur.
La procédure se termine avec succès ainsi :
{
"response_code": "OK"
}
TASK_REACTOR.UPDATE_WAREHOUSE_INSTANCE¶
Paramètres d’entrée :
WAREHOUSE_NAME
VARCHAR
INSTANCE_SCHEMA
VARCHAR
La procédure lance le processus de modification de l’entrepôt pour une instance donnée de Task Reactor. Elle modifie l’entrepôt de la tâche du répartiteur, puis lance un travail qui modifie de manière asynchrone l’entrepôt de toutes les tâches workers.
Note
La logique de cette procédure est déjà utilisée dans Mise à jour de l’entrepôt, il n’est donc pas nécessaire d’utiliser cette procédure dans le cadre de la mise à jour de l’entrepôt pour l’ensemble du connecteur.
La procédure se termine avec succès ainsi :
{
"response_code": "OK"
}
Les erreurs possibles incluent les suivantes :
INSTANCE_NOT_FOUND
- L’instance donnée n’existe pas.TASK_REACTOR_INSTANCE_IS_ACTIVE
- L’instance de Task Reactor n’a pas été mise en pause avant d’utiliser cette procédure.
Procédures internes¶
Toutes les procédures ci-dessous sont utilisées uniquement en interne dans le script d’installation de task_reactor
et ne doivent pas être utilisées en externe.
TASK_REACTOR.CREATE_INSTANCE_SCHEMA (INSTANCE_SCHEMA_NAME VARCHAR)¶
Cette procédure crée un nouveau schéma avec l’identificateur nommé instance_schema_name
, puis lève une nouvelle exception si le schéma n’a pas pu être créé.
Les erreurs possibles incluent les suivantes :
SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS
- Un schéma portant le même nom existe déjà.
TASK_REACTOR.VALIDATE_PROCEDURE_EXISTENCE¶
Paramètres d’entrée :
PROCEDURE_NAME
VARCHAR
PROCEDURE_TYPE
VARCHAR
Cette procédure vérifie si les procédures définies n’existent pas et lève une nouvelle exception.
Les erreurs possibles incluent les suivantes :
WORKER_PROCEDURE_NOT_FOUND_EXCEPTION
- La procédure pour les tâches worker n’a pas été trouvée.WORK_SELECTOR_PROCEDURE_NOT_FOUND_EXCEPTION
- La procédure de sélection des travaux n’a pas été trouvée.DEFAULT_PROCEDURE_VALIDATION_EXCEPTION
- Procédure stockée non trouvée.
TASK_REACTOR.CREATE_QUEUE¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
TABLE_NAME
VARCHAR
STREAM_NAME
VARCHAR
La méthode d’aide pour Task Reactor
, elle propose de créer une table de file d’attente avec le nom instance_schema_name.table_name
et les colonnes suivantes :
ID
STRING
RESOURCE_ID
STRING
DISPATCHER_OPTIONS
VARIANT
WORKER_PAYLOAD
VARIANT
TIMESTAMP
DATETIME
Elle crée ensuite un flux portant le nom instance_schema_name.stream_name
s’il n’existe pas encore.
TASK_REACTOR.CREATE_WORKER_REGISTRY_SEQUENCE¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
SEQUENCE_NAME
VARCHAR
La méthode d’aide pour Task Reactor
, qui propose la création d’une séquence pour le registre de la tâche worker avec le nom de séquence instance_schema_name.sequence_name
.
TASK_REACTOR.CREATE_WORKER_REGISTRY¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
TABLE_NAME
VARCHAR
SEQUENCE_NAME
VARCHAR
La méthode d’aide pour Task Reactor
, qui permet de créer des registres de tâches worker, se compose d’une table portant le nom instance_schema_name.table_name
et les colonnes :
WORKER_ID NUMBER
avec la séquenceinstance_schema_name.sequence_name
par défautCREATED_AT
DATETIME
UPDATED_AT
DATETIME
STATUS
STRING
TASK_REACTOR.CREATE_WORKER_STATUS_TABLE¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
TABLE_NAME
VARCHAR
Une méthode d’aide pour Task Reactor
, qui permet de créer une table de statut pour une tâche worker portant le nom instance_schema_name.table_name
et avec les colonnes :
WORKER_ID
NUMBER
TIMESTAMP
DATETIME
STATUS
STRING
TASK_REACTOR.CREATE_CONFIG_TABLE¶
Paramètres d’entrée :
INSTANCE_SCHEMA_NAME
VARCHAR
TABLE_NAME
VARCHAR
WORKER_PROCEDURE_NAME
VARCHAR
WORK_SELECTOR_TYPE
VARCHAR
WORK_SELECTOR_NAME
VARCHAR
EXPIRED_WORK_SELECTOR_NAME
VARCHAR
IS_INSTANCE_REGISTERED
BOOLEAN
La méthode d’aide pour Task Reactor
, propose de créer une table de configuration nommée instance_schema_name.table_name
avec des colonnes de clés et de valeurs. Elle insère ensuite les données de configuration dans la table si elles ne sont pas déjà enregistrées avec les valeurs suivantes :
WORKER_PROCEDURE
WORK_SELECTOR_TYPE
WORK_SELECTOR
SCHEMA