ConsumeElasticsearch 2025.10.9.21

Bundle

org.apache.nifi | nifi-elasticsearch-restapi-nar

Description

Un processeur qui exécute de manière répétée une requête paginée sur un champ à l’aide d’une requête Range pour consommer de nouveaux documents à partir d’un index/d’une requête Elasticsearch. Le processeur récupère plusieurs pages de résultats jusqu’à ce qu’il n’y ait plus de résultats disponibles ou que l’expiration du délai de conservation de la pagination soit atteinte, après quoi la requête Range met automatiquement à jour la contrainte de champ sur la base de la dernière valeur de document récupérée.

Balises

elasticsearch, elasticsearch7, elasticsearch8, elasticsearch9, json, page, query, scroll, search

Exigences en matière d’entrées

FORBIDDEN

Prend en charge les propriétés dynamiques sensibles

false

Propriétés

Propriété

Description

Additional Filters

Un ou plusieurs filtres de requête exprimé dans la syntaxe JSON, et non dans la syntaxe Lucene. Exemple : [{« match »:{« somefield »: »somevalue »}}, {« match »:{« anotherfield »: »anothervalue »}}]. Ces filtres seront utilisés dans le cadre du filtre d’une requête Bool.

Aggregation Results Format

Format de la sortie des agrégations.

Aggregation Results Split

Générer en sortie un FlowFile contenant toutes les agrégations ou un FlowFile pour chaque agrégation.

Aggregations

Une ou plusieurs agrégations de requêtes (ou « aggs »), en syntaxe JSON. Exemple : {« items »: {« terms »: {« field »: « product », « size »: 10}}}

Client Service

Un service client Elasticsearch à utiliser pour l’exécution des requêtes.

Champs

Champs des documents indexés à récupérer, dans la syntaxe JSON. Exemple : [« user.id », « http.response.* », {« field »: « @timestamp », « format »: « epoch_millis »}]

Index

Le nom de l’index à utiliser.

Initial Value

Valeur initiale à utiliser pour la requête si le processeur n’a pas été exécuté précédemment. Si le processeur s’est déjà exécuté et a stocké une valeur dans son état, cette propriété sera ignorée. Si aucune valeur n’est fournie et que le processeur n’a pas encore été exécuté, aucune limite de requête ne sera utilisée, c’est-à-dire que tous les documents seront récupérés dans l’ordre de tri spécifié.

Format de date de la valeur initiale

Si le champ « Range Query Field » est de type date, convertissez la valeur « Initial Value » en date avec ce format. Si cette propriété n’est pas spécifiée, Elasticsearch utilisera le format de date fourni par le mappage du champ « Range Query Field ». Pour connaître la syntaxe valide, voir https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html

Fuseau horaire de date de la valeur initiale

Si le champ « Range Query Field » est de type date, convertissez la valeur « Initial Value » en UTC avec ce fuseau horaire. Les valeurs valides sont les décalages UTC ISO 8601, tels que « +01:00 » ou « -08:00 », et les ID de fuseau horaire IANA, tels qu”« Europe/Londres ».

Max JSON Field String Length

Longueur maximale autorisée pour une valeur de chaîne lors de l’analyse d’un document ou d’un attribut JSON.

Output No Hits

Générer un FlowFile « hits » en sortie même si aucun hit n’a été trouvé pour la requête. Si cette propriété est définie sur true, un FlowFile « hits » vide sera généré en sortie même si des « agrégations » sont générées en sortie.

Pagination Keep Alive

Période de persistance (« keep_alive ») de la pagination. Période pendant laquelle Elasticsearch maintiendra le curseur de scroll/PIT en vie entre deux requêtes (il ne s’agit pas du temps attendu pour que toutes les pages soient récupérées, mais du temps maximum autorisé pour les requêtes entre les récupérations de pages).

Pagination Type

Méthode de pagination à utiliser. Tous les types ne sont pas disponibles pour toutes les versions d’Elasticsearch. Consultez la documentation d’Elasticsearch pour savoir lesquels sont applicables et recommandés pour votre service.

Query Attribute

Si l’option est activée, la requête exécutée sera définie sur chaque FlowFile de résultats dans l’attribut spécifié.

Champ de requête de plage

Champ à suivre dans le cadre d’une requête Elasticsearch Range à l’aide d’une correspondance liée « gt ». Ce champ doit exister dans le document Elasticsearch pour que celui-ci soit récupéré.

Script Fields

Champs à créer en utilisant l’évaluation du script lors de l’exécution de la requête, dans la syntaxe JSON. Ex: {« test1 »: {« script »: {« lang »: « painless », « source »: « doc[ “price”].value * 2 »}}, « test2 »: {« script »: {« lang »: « painless », « source »: « doc[ “price”].value * params.factor », « params »: {« factor »: 2.0}}}}

Format des résultats de recherche

Format de la sortie des Hits.

Fractionnement des résultats de recherche

Générer en sortie un FlowFile contenant tous les résultats ou un FlowFile pour chaque résultat, ou encore un FlowFile contenant tous les résultats de toutes les réponses paginées.

Taille

Le nombre maximum de documents à récupérer dans la requête. Si la requête est paginée, cette taille s’applique à chaque page de la requête, et non à la taille du jeu de résultats.

Trier

Trier les résultats en fonction d’un ou plusieurs champs, dans la syntaxe JSON. Exemple : [{« price » : {« order » : « asc », « mode » : « avg »}}, {« post_date » : {« format »: « strict_date_optional_time_nanos »}}]

Sort Order

Ordre dans lequel trier le champ « Range Query Field ». Une clause « SORT » pour le champ « Range Query Field » sera ajoutée en tête de toutes les clauses « SORT » fournies. Si une clause « SORT » existe déjà pour le champ « Range Query Field », elle ne sera pas mise à jour.

Type

Le type de ce document (utilisé par Elasticsearch pour l’indexation et la recherche).

Gestion de l’État

Champs d’application

Description

CLUSTER

L’état de la pagination (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) est conservé entre les invocations de ce processeur jusqu’à l’expiration de l’intervalle Scroll/PiT (lorsque l’heure actuelle est postérieure à la dernière exécution de la requête plus l’intervalle de pagination Keep Alive).

Relations

Nom

Description

aggregations

Les agrégations sont routées vers cette relation.

failure

Tous les FlowFiles qui échouent pour des raisons indépendantes de la disponibilité du serveur sont dirigés vers cette relation.

hits

Les résultats des recherches sont routés vers cette relation.

retry

Tous les FlowFiles qui échouent en raison de la disponibilité du serveur/cluster sont dirigés vers cette relation.

Écrit les attributs

Nom

Description

mime.type

application/json

page.number

Le numéro de la page (requête), à partir de 1, dans laquelle ont été renvoyés les résultats qui se trouvent dans le FlowFile de sortie

hit.count

Le nombre d’occurrences dans le FlowFile de sortie

elasticsearch.requête.erreur

Le message d’erreur fourni par Elasticsearch en cas d’erreur de requête sur l’index.

Voir aussi :