ConsumeKafka 2025.5.31.15

Bundle

com.snowflake.openflow.runtime | runtime-kafka-nar

Description

Consomme les messages de l’API Apache Kafka Consumer. Le processeur NiFi complémentaire pour l’envoi de messages est PublishKafka. Le processeur prend en charge la consommation de messages Kafka, éventuellement interprétés comme des enregistrements NiFi. Veuillez noter qu’à ce stade (en mode lecture d’enregistrements), le processeur suppose que tous les enregistrements récupérés dans une partition donnée ont le même schéma. Pour ce mode, si l’un des messages Kafka est extrait mais ne peut pas être analysé ou écrit avec le Record Reader ou le Record Writer configurés, le contenu du message sera écrit dans un FlowFile distinct, et ce FlowFile sera transféré à la relation « parse.failure ». Dans le cas contraire, chaque FlowFile est envoyé à la relation « success » et de nombreux messages distincts peuvent être contenus dans le même FlowFile. Un attribut « record.count » est ajouté pour indiquer le nombre de messages contenus dans le FlowFile. Deux messages Kafka ne seront pas placés dans le même FlowFile s’ils ont des schémas différents ou s’ils ont des valeurs différentes pour un en-tête de message qui est inclus par la propriété <Headers to Add as Attributes>.

Balises

avro, consume, csv, get, ingest, ingress, json, kafka, openflow, pubsub, record, topic

Exigences en matière d’entrées

FORBIDDEN

Prend en charge les propriétés dynamiques sensibles

false

Propriétés

Propriété

Description

Commit Offsets

Spécifie si ce processeur doit valider les décalages auprès de Kafka après avoir reçu les messages. En règle générale, cette valeur doit être définie sur TRUE afin que les messages reçus ne soient pas dupliqués. Toutefois, dans certains scénarios, nous pouvons vouloir éviter de valider les décalages, afin que les données puissent être traitées et reconnues plus tard par PublishKafka de façon à fournir la sémantique « Exactly Once ».

Content Field

Spécifie sous quel champ de l’enregistrement le contenu sera ajouté. Si ce paramètre n’est pas défini, le contenu se trouvera à la racine de l’enregistrement

ID de groupe

Identificateur du groupe de consommateurs Kafka correspondant à la propriété group.id de Kafka

Header Encoding

Codage des caractères appliqué lors de la lecture des valeurs de l’en-tête de l’enregistrement Kafka et de l’écriture des attributs de FlowFile

Header Name Pattern

Modèle d’expression régulière appliqué aux noms d’en-tête d’enregistrement Kafka pour sélectionner les valeurs d’en-tête à écrire en tant qu’attributs de FlowFile

Headers Field Parent

Spécifie sous quel champ de l’enregistrement le champ de l’en-tête sera ajouté. Si ce paramètre n’est pas défini, le champ des en-têtes se trouve à la racine de l’enregistrement

Kafka Connection Service

Fournit des connexions au courtier Kafka pour la publication d’enregistrements Kafka

Key Attribute Encoding

Codage de la valeur de l’attribut FlowFile configuré contenant la clé d’enregistrement Kafka.

Key Field Parent

Indique sous quel champ de l’enregistrement le champ clé sera ajouté. Si ce paramètre n’est pas défini, le champ clé se trouve à la racine de l’enregistrement

Key Format

Spécifie comment représenter la clé d’enregistrement Kafka dans le FlowFile de sortie

Key Record Reader

Le Record Reader à utiliser pour analyser la clé d’enregistrement Kafka afin de la représenter sous la forme d’un enregistrement

Max Uncommitted Time

Spécifie la durée maximale pendant laquelle le processeur peut consommer à partir de Kafka avant de devoir transférer des FlowFiles dans le flux et valider les décalages auprès de Kafka (le cas échéant). Une période plus longue peut se traduire par une latence plus importante

Message Demarcator

Comme KafkaConsumer reçoit les messages par lots, ce processeur a une option pour générer en un seul lot les FlowFiles de sortie contenant tous les messages Kafka relatifs à un sujet et à une partition donnés, et cette propriété vous permet de fournir une chaîne (interprétée comme de l’UTF-8) à utiliser pour délimiter plusieurs messages Kafka. Il s’agit d’une propriété facultative. Si elle n’est pas fournie, chaque message Kafka reçu donnera lieu à un seul FlowFile à chaque déclenchement. Pour saisir un caractère spécial tel que « nouvelle ligne », utilisez CTRL+Entrée ou Maj+Entrée en fonction de l’OS

Metadata Field

Spécifie sous quel champ de l’enregistrement les métadonnées seront ajoutées. Si ce paramètre n’est pas fixé, les métadonnées se trouveront à la racine de l’enregistrement

Metadata Received Timestamp Field

Si cette propriété est définie, un horodatage sera placé sous le champ spécifié dans les métadonnées de l’enregistrement dans le FlowFile de sortie

Output Strategy

Le format utilisé pour convertir l’enregistrement Kafka en un enregistrement FlowFile.

Processing Strategy

Stratégie de traitement des enregistrements Kafka et d’écriture de la sortie sérialisée dans les FlowFiles

Record Reader

Le Record Reader à utiliser pour les messages Kafka entrants

Record Writer

Le Record Writer à utiliser pour sérialiser les FlowFiles sortants

Separate By Key

Lorsque cette propriété est activée, deux messages ne seront ajoutés au même FlowFile que si les deux messages Kafka ont des clés identiques.

Topic Format

Spécifie si les sujets fournis sont une liste de noms séparés par des virgules ou une expression régulière unique

Chapitres

Le nom ou le modèle des sujets Kafka à partir desquels le processeur consomme des enregistrements Kafka. Il est possible d’en fournir plusieurs s’ils sont séparés par des virgules.

auto.offset.reset

Configuration automatique des décalages appliquée lorsqu’aucun décalage de consommateur précédent n’a été trouvé, selon la propriété Kafka auto.offset.reset

Relations

Nom

Description

success

FlowFiles contenant un ou plusieurs enregistrements Kafka sérialisés

Écrit les attributs

Nom

Description

record.count

Le nombre d’enregistrements reçus

mime.type

Le type MIME fourni par le Record Writer configuré

kafka.count

Le nombre de messages écrits si plusieurs messages sont écrits

kafka.key

La clé du message si elle est présente et s’il s’agit d’un message unique. La façon dont la clé est encodée dépend de la valeur de la propriété « Key Attribute Encoding ».

kafka.offset

Le décalage du message dans la partition du sujet.

kafka.timestamp

L’horodatage du message dans la partition du sujet.

kafka.partition

La partition du sujet dont provient le message ou le bundle de messages

kafka.topic

Le sujet dont provient le message ou le bundle de messages

kafka.tombstone

Définissez ce paramètre sur TRUE si le message consommé est un message tombstone

Voir aussi :