Réacteur de tâches¶
Bibliothèque qui fournit des éléments et fonctions communs à tous les connecteurs Snowflake.
Exigences¶
Task Reactor nécessite au moins les fichiers SQL suivants pour être exécutés lors de l’installation de l’application native :
task_reactor.sql
(voir : Référence SQL de réacteur de tâches)
Vue d’ensemble¶
Task Reactor est un module distinct qui fournit un mécanisme d’orchestration pour les morceaux de travail stockés dans une file d’attente avec un ensemble limité de tâches. La file d’attente et le répartiteur de Task Reactor sont basés sur Snowflake Streams avec Snowflake Tasks et seront déclenchés toutes les minutes en raison de la limitation du temps d’actualisation. Task Reactor ne sera actif que lorsqu’il y aura des données dans la file d’attente d’entrée, afin de permettre à l’entrepôt d’économiser des crédits.
Task Reactor se compose de trois éléments principaux : la file d’attente, le répartiteur et les tâches worker :
Votre application de connecteur ajoute QueueItems à la file d’attente.
Toutes les minutes, le répartiteur (une tâche Snowflake) récupère les QueueItems en attente dans la file d’attente et les transmet aux tâches worker.
Chaque minute, les tâches worker (tâches Snowflake) travaillent en parallèle sur la QueueItems attribuée.
Une fois la configuration du connecteur finalisée, la configuration de Task Reactor est limitée à 3 étapes :
Création de tous les composants de Task Reactor
Initialisation de l’instance
(facultatif) Modification du numéro des tâches worker
Création de tous les composants de Task Reactor¶
Pour créer un objet d’instance, l’utilisateur doit d’abord créer les implémentations worker
, selector
et éventuellement expired selector
, puis les intégrer à l’aide de la procédure TASK_REACTOR.CREATE_INSTANCE_OBJECTS.
Mise en œuvre de la tâche worker¶
La tâche worker est chargée d’effectuer une performance attribuée par le répartiteur, telle que l’extraction et l’ingestion de certaines données. La seule partie obligatoire est d’avoir une méthode de tâches worker spécifique qui initie le travail. Cette méthode doit pouvoir être appelée à partir de la procédure Snowpark, renvoyer une chaîne et contenir les paramètres suivants :
session
- Un objet de session Snowparkworker_id
- Numéro, identifiant unique de la tâche workertask_reactor_schema
- Nom du schéma dans lequel les objets Task Reactor sont créés. Il peut être utilisé comme nom d’instance de Task Reactor.
La tâche worker est chargée d’exécuter la tâche attribuée par le répartiteur, par exemple d’extraire et d’ingérer des données spécifiques. Nous vous recommandons d’utiliser les classes Java (com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorker
et com.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion
) ou, pour les tâches plus simples (com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorker
et com.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask
), toutefois, votre tâche worker peut être créée dans n’importe quel langage de programmation pris en charge pour l’écriture de gestionnaires (handlers) de procédures stockées.
Exemple de méthode de tâches worker Java :
public static String executeWork(Session session, int workerId, String taskReactorSchema) {
FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
WorkerId workerIdentifier = new WorkerId(workerId);
Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
try {
IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
} catch (WorkerException e) {
// handle the exception...
throw new RuntimeException(e);
}
return "Worker procedure executed.";
}
Avec une méthode de tâches worker déjà créée, l’utilisateur doit l’intégrer dans CONNECTOR.WORKER_PROCEDURE
. La procédure doit appeler sa propre méthode de tâches worker. Elle doit être créée dans le schéma de votre application, renvoyer une STRING et contenir les paramètres suivants :
worker_id
- nombretask_reactor_schema
- chaîne
Un exemple de procédure, appelant l’implémentation Java de la tâche worker :
CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
RETURNS STRING
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
La bibliothèque de télémétrie est nécessaire pour collecter les métriques qui sont enregistrées dans la table d’événements.
Mise en œuvre du sélecteur¶
Le rôle du sélecteur est de décider quelles tâches en attente doivent être traitées par Task Reactor. Comme pour l’implémentation de la tâche worker, elle peut être créée dans n’importe quel langage pris en charge par Snowpark. Le sélecteur de tâches peut être mis en œuvre sous la forme d’une procédure de base de données ou d’une vue de base de données. Le sélecteur (procédure ou vue) doit être transmis comme argument dans la procédure TASK_REACTOR.CREATE_NEW_INSTANCE
.
La procédure doit pouvoir être appelée à partir d’une procédure Snowpark, renvoyer une chaîne et contenir les paramètres suivants :
session
- Session SnowparkqueueItems
- String[] (un tableau de chaînes JSON individuelles, chacune décrivant un seul QueueItem)
Exemple de méthode de sélection Java :
public static String selectWork(Session session, String[] queueItems) {
Variant[] sorted =
Arrays.stream(queueItems)
.map(Variant::new)
.filter(
queueItem ->
!queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
.sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
.toArray(Variant[]::new);
return new Variant(sorted).asJsonString();
}
Au lieu de la méthode du sélecteur, il est toujours possible de créer une vue qui filtrera et triera les tâches de la file d’attente existante. Le répartiteur peut récupérer les nouvelles tâches dans la vue nouvellement créée à l’aide d’une requête donnée en exemple :
CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Avec une méthode de sélection déjà créée, l’utilisateur doit l’intégrer dans CONNECTOR.WORK_SELECTOR
. La procédure doit appeler votre méthode obligatoire de sélection du travail. Elle doit être créée dans le schéma de votre application, renvoyer un ARRAY et contenir le paramètre suivant :
work_items - array
Un exemple de procédure, appelant l’implémentation Java du sélecteur de travail :
CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
RETURNS ARRAY
LANGUAGE JAVA
RUNTIME_VERSION = '11'
PACKAGES = ('com.snowflake:snowpark:1.11.0')
IMPORTS = ('@jars/myconnector-1.0.0.jar')
HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Mise en œuvre du sélecteur expiré¶
Le rôle du sélecteur expiré est de décider quels éléments de la file d’attente doivent être retirés de la file d’attente de Task Reactor. Il peut être nécessaire de supprimer des éléments parce que le sélecteur ne peut jamais atteindre certains éléments et que ceux-ci resteraient éternellement dans la file d’attente. En outre, certains éléments qui attendent dans la file d’attente peuvent avoir été créés longtemps auparavant et il n’est plus utile de les traiter. Le sélecteur expiré peut être mis en œuvre sous la forme d’une vue de la base de données. La vue du sélecteur doit être transmise en tant qu’argument dans la procédure TASK_REACTOR.CREATE_NEW_INSTANCE
. S’il n’est pas nécessaire de retirer des éléments de la file d’attente, l’implémentation par défaut peut être utilisée TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
.
En utilisant la requête suivante, il est possible de créer une vue avec sélecteur expiré qui sélectionne les éléments qui ont été créés il y a plus de 3 jours :
CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
AS SELECT * FROM TASK_REACTOR.QUEUE q
WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Intégrer des objets d’instance¶
Le TASK_REACTOR.CREATE_INSTANCE_OBJECTS permet à l’utilisateur de configurer toutes les instances ensemble avant d’initialiser les instances créées. La procédure ne peut être exécutée qu’une seule fois par schéma, de sorte que tout appel ultérieur n’entraînera aucune modification. Nous vous recommandons de placer l’appel d’initialisation dans le fichier setup.sql
, afin d’éviter que la procédure ne soit exécutée plusieurs fois ou qu’elle ne soit pas appelée du tout.
Paramètres requis :
instance_schema_name VARCHAR
- Un schéma unique par instance qui stocke les objets de la base de données sur lesquels l’instance travaille.worker_procedure_name VARCHAR
- Nom de la procédure de la tâche worker décrite dans la partieWorker Implementation
.work_selector_type VARCHAR
- Valeurs indiquant si les nouvelles tâches doivent utiliser la vue ou la procédure. Valeurs possibles : VIEW, PROCEDURE.work_selector_name VARCHAR
- Nom de la procédure/vue de sélection décrite dans la partieSelector Implementation
.
Paramètres facultatifs :
expired_work_selector_name VARCHAR
- Nom de la vue du sélecteur expirée décrite dans la partieExpired Selector Implementation
. Si la valeur n’est pas fournie,TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR
est utilisé comme implémentation par défaut qui ne renvoie rien.
Initialisation de l’instance¶
Pour initialiser et exécuter toutes les configurations dans Task Reactor, l’utilisateur doit appeler INITIALIZE_INSTANCE
. La procédure prend en compte les paramètres suivants :
instance_schema_name
- (obligatoire) Nom du schéma qui stocke les objets de la base de données sur lesquels l’instance travaille.warehouse_name
(obligatoire) Nom de l’entrepôt sur lequel l’instance sera exécutée.dt_should_be_started
(facultatif) - par défaut :TRUE
. La tâche du répartiteur doit être lancée lors de la création d’une nouvelle instance ou non.dt_task_schedule
(facultatif) - par défaut :1 MINUTE
. Fréquence d’exécution de la tâche du répartiteur.dt_allow_overlapping_execution
(facultatif) - par défaut :FALSE
. Permet à DAG de fonctionner simultanément.dt_user_task_timeout_ms
(facultatif) - Limite de temps sur une seule exécution de la tâche avant son expiration (en millisecondes).
Note
Si la procédure de la tâche worker prend plus de temps que le délai fixé pour la tâche des workers (USER_TASK_TIMEOUT_MS), la procédure s’interrompt avec une erreur de délai. Il est important de planifier les tâches de manière à ne pas dépasser le délai d’attente de la tâche Snowflake.
Après avoir fourni le nombre minimum de paramètres requis, le Task Reactor
est initialisé avec la configuration fournie et répartit les tâches worker à l’aide de la procédure TASK_REACTOR.DISPATCHER
.
Définition du nombre de tâches worker¶
Le nombre de tâches worker peut être modifié manuellement en appelant la procédure TASK_REACTOR.SET_WORKERS_NUMBER avec les paramètres suivants :
WORKERS_NUMBER
- nouveau nombre de tâches worker.TR_INSTANCE_SCHEMA_NAME
- nom du schéma de l’instance
Métriques¶
Task Reactor contient un mécanisme de métrique. Il se base sur les événements de trace Snowflake. Les métriques sont enregistrées dans la table d’événements, qui doit donc être activée pour que les mesures fonctionnent.
Actuellement, les métriques suivantes sont introduites :
worker working time
(TASK_REACTOR_WORKER_WORKING_TIME
) - Indique le temps pendant lequel une tâche worker a effectivement traité des ressources. La minuterie démarre lorsqu’une tâche worker commence et se termine lorsque la tâche worker se termine.worker idle time
(TASK_REACTOR_WORKER_IDLE_TIME
) - C’est le contraire deworker working time
. Indique le temps pendant lequel une tâche worker était en veille : soit en attente d’un nouveau travail, soit en attente de la prochaine planification de sa tâche. La minuterie commence lorsqu’une tâche worker termine sa tâche et se termine lorsque la tâche worker recommence.
Pour afficher tous les événements de métriques enregistrés, vous pouvez utiliser la requête suivante :
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
ORDER BY event.timestamp DESC;
> `` Pour ne sélectionner qu’un seul type de métriques, ajoutez le nom de la métrique `` event.record:name = <à la clause where
de la requête.
SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';
SELECT
event.record:name::string AS EVENT_NAME,
span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
span.record_attributes:worker_id AS WORKER_ID,
event.record_attributes:value AS DURATION
FROM IDENTIFIER($EVENT_TABLE) event
JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
WHERE
event.resource_attributes:"snow.database.name" = $APP_NAME
AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
ORDER BY event.timestamp DESC;