Task reactor SQL reference

Database objects and procedures

The following database objects are created through the file task_reactor.sql.

TASK_REACTOR SCHEMA

Versioned schema containing some database object of task reactor in the connector.

TASK_REACTOR_INSTANCES SCHEMA

Non-Versioned schema containing some instance database object of task reactor in the connector.

TASK_REACTOR_INSTANCES.INSTANCE_REGISTRY

This table is created to store the data about Task Reactor instances in order to give the ability to track and manage existing instances during the application runtime. The table is created in the TASK_REACTOR_INSTANCES schema.

  • instance_name VARCHAR

  • is_initialized BOOLEAN

  • is_active BOOLEAN

TASK_REACTOR.DISPATCHER(INSTANCE_SCHEMA_NAME VARCHAR)

This procedure invokes the Java DispatcherHandler.dispatchWorkItems and allows to dispatch work items.

TASK_REACTOR.SET_WORKERS_NUMBER (WORKERS_NUMBER NUMBER, INSTANCE_SCHEMA_NAME VARCHAR)

This procedure invokes the Java SetWorkersNumberHandler.setWorkersNumber and allows to set number of workers.

TASK_REACTOR.CREATE_INSTANCE_OBJECTS

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

Procedure creates all of instance objects required for accurate Task reactor flow and validates the ones who should not be already initialized. At the end of process it insert new instance registry record to the table.

Possible errors include:

  • INSTANCE_NOT_FOUND - Instance with this name does not exists.

  • INSTANCE_ALREADY_INITIALIZED - Instance with this name is already initialized.

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION - Procedure not found.

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS - Schema with the same name already exists.

  • CREATING_TR_INSTANCE_EXCEPTION - Something unexpected went wrong while creating a new instance of task reactor. No instance has been created.

TASK_REACTOR.INITIALIZE_INSTANCE

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WAREHOUSE_NAME VARCHAR

  • DT_SHOULD_BE_STARTED BOOLEAN

  • DT_TASK_SCHEDULE VARCHAR

  • DT_ALLOW_OVERLAPPING_EXECUTION BOOLEAN

  • DT_USER_TASK_TIMEOUT_MS VARCHAR

Procedure starts all non initialized instances within the same database instance. It consist of checking instance exists, or whether it is not already initialized and then creates dispatcher tasks and starts this task if was required.

Procedure ends successfully with:

{
    "response_code": "OK",
    "message": "Instance has been initialized successfully."
}
Copy

Possible errors include:

  • INSTANCE_NOT_FOUND - Instance does not exist.

  • INSTANCE_ALREADY_INITIALIZED - Instance with this name is already initialized.

TASK_REACTOR.PAUSE_INSTANCE

Input parameters:

  • INSTANCE_SCHEMA VARCHAR

Procedure starts the process of pausing a given instance of Task Reactor and returns OK response. It starts a job which asynchronously stops all worker tasks and the dispatcher task. In case when a worker task was already performing an ingestion, the task is not being stopped right away, but the task will be stopped after the ingestion is finished.

Note

The logic of this procedure is already used in Pause Connector, so it’s not needed to use this procedure as a part of stopping the whole connector.

Procedure ends successfully with:

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.RESUME_INSTANCE

Input parameters:

  • INSTANCE_SCHEMA VARCHAR

Procedure starts the process of resuming a given instance of Task Reactor and returns OK response. It resumes the dispatcher task and starts a job which asynchronously resumes all worker tasks that have already assigned work.

Note

The logic of this procedure is already used in Resume Connector, so it’s not needed to use this procedure as a part of resuming the whole connector.

Procedure ends successfully with:

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.UPDATE_WAREHOUSE_INSTANCE

Input parameters:

  • WAREHOUSE_NAME VARCHAR

  • INSTANCE_SCHEMA VARCHAR

Procedure starts the process of changing the warehouse for a given instance of Task Reactor. It changes the warehouse of the dispatcher task and then starts a job which asynchronously changes the warehouse of all worker tasks.

Note

The logic of this procedure is already used in Update Warehouse, so it’s not needed to use this procedure as a part of updating the warehouse for the whole connector.

Procedure ends successfully with:

{
    "response_code": "OK"
}
Copy

Possible errors include:

  • INSTANCE_NOT_FOUND - Given instance does not exist.

  • TASK_REACTOR_INSTANCE_IS_ACTIVE - Given Task Reactor instance has not been paused before using this procedure.

Internal procedures

All of below procedures are used only for internal use in task_reactor setup script and should not be used externally.

TASK_REACTOR.CREATE_INSTANCE_SCHEMA (INSTANCE_SCHEMA_NAME VARCHAR)

This procedure creates new schema with identifier named instance_schema_name, and then throws a new exception if the schema could not be created.

Possible errors include:

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS - Schema with the same name already exists.

TASK_REACTOR.VALIDATE_PROCEDURE_EXISTENCE

Input parameters:

  • PROCEDURE_NAME VARCHAR

  • PROCEDURE_TYPE VARCHAR

This procedure validates whether defined procedures does not exists and then throws new exception.

Possible errors include:

  • WORKER_PROCEDURE_NOT_FOUND_EXCEPTION - Worker procedure not found.

  • WORK_SELECTOR_PROCEDURE_NOT_FOUND_EXCEPTION - Work selector procedure not found.

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION - Procedure not found.

TASK_REACTOR.CREATE_QUEUE

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • STREAM_NAME VARCHAR

The helper method for Task Reactor, it offers creating queue table with the name instance_schema_name.table_name and the following columns:

  • ID STRING

  • RESOURCE_ID STRING

  • DISPATCHER_OPTIONS VARIANT

  • WORKER_PAYLOAD VARIANT

  • TIMESTAMP DATETIME

Then it creates stream with name instance_schema_name.stream_name if it does not exist yet.

TASK_REACTOR.CREATE_WORKER_REGISTRY_SEQUENCE

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

The helper method for Task Reactor, which offers creating sequence for worker registry with the instance_schema_name.sequence_name sequence name.

TASK_REACTOR.CREATE_WORKER_REGISTRY

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

The helper method for Task Reactor, which offers creating worker registries consists of a table with the name instance_schema_name.table_name and columns:

  • WORKER_ID NUMBER with default instance_schema_name.sequence_name sequence

  • CREATED_AT DATETIME

  • UPDATED_AT DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_WORKER_STATUS_TABLE

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

A helper method for Task Reactor, which offers creating a status table for a worker with the name instance_schema_name.table_name and columns:

  • WORKER_ID NUMBER

  • TIMESTAMP DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_CONFIG_TABLE

Input parameters:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

  • IS_INSTANCE_REGISTERED BOOLEAN

The helper method for Task Reactor, offers to create a configuration table named instance_schema_name.table_name with key and value columns. It then inserts the configuration data into the table if it is not already registered with the following values:

  • WORKER_PROCEDURE

  • WORK_SELECTOR_TYPE

  • WORK_SELECTOR

  • SCHEMA