ConsumeKafka 2025.5.31.15¶
Bundle¶
com.snowflake.openflow.runtime | runtime-kafka-nar
Description¶
Consomme les messages de l’API Apache Kafka Consumer. Le processeur NiFi complémentaire pour l’envoi de messages est PublishKafka. Le processeur prend en charge la consommation de messages Kafka, éventuellement interprétés comme des enregistrements NiFi. Veuillez noter qu’à ce stade (en mode lecture d’enregistrements), le processeur suppose que tous les enregistrements récupérés dans une partition donnée ont le même schéma. Pour ce mode, si l’un des messages Kafka est extrait mais ne peut pas être analysé ou écrit avec le Record Reader ou le Record Writer configurés, le contenu du message sera écrit dans un FlowFile distinct, et ce FlowFile sera transféré à la relation « parse.failure ». Dans le cas contraire, chaque FlowFile est envoyé à la relation « success » et de nombreux messages distincts peuvent être contenus dans le même FlowFile. Un attribut « record.count » est ajouté pour indiquer le nombre de messages contenus dans le FlowFile. Deux messages Kafka ne seront pas placés dans le même FlowFile s’ils ont des schémas différents ou s’ils ont des valeurs différentes pour un en-tête de message qui est inclus par la propriété <Headers to Add as Attributes>.
Exigences en matière d’entrées¶
FORBIDDEN
Prend en charge les propriétés dynamiques sensibles¶
false
Propriétés¶
Propriété |
Description |
---|---|
Commit Offsets |
Spécifie si ce processeur doit valider les décalages auprès de Kafka après avoir reçu les messages. En règle générale, cette valeur doit être définie sur TRUE afin que les messages reçus ne soient pas dupliqués. Toutefois, dans certains scénarios, nous pouvons vouloir éviter de valider les décalages, afin que les données puissent être traitées et reconnues plus tard par PublishKafka de façon à fournir la sémantique « Exactly Once ». |
Content Field |
Spécifie sous quel champ de l’enregistrement le contenu sera ajouté. Si ce paramètre n’est pas défini, le contenu se trouvera à la racine de l’enregistrement |
ID de groupe |
Identificateur du groupe de consommateurs Kafka correspondant à la propriété group.id de Kafka |
Header Encoding |
Codage des caractères appliqué lors de la lecture des valeurs de l’en-tête de l’enregistrement Kafka et de l’écriture des attributs de FlowFile |
Header Name Pattern |
Modèle d’expression régulière appliqué aux noms d’en-tête d’enregistrement Kafka pour sélectionner les valeurs d’en-tête à écrire en tant qu’attributs de FlowFile |
Headers Field Parent |
Spécifie sous quel champ de l’enregistrement le champ de l’en-tête sera ajouté. Si ce paramètre n’est pas défini, le champ des en-têtes se trouve à la racine de l’enregistrement |
Kafka Connection Service |
Fournit des connexions au courtier Kafka pour la publication d’enregistrements Kafka |
Key Attribute Encoding |
Codage de la valeur de l’attribut FlowFile configuré contenant la clé d’enregistrement Kafka. |
Key Field Parent |
Indique sous quel champ de l’enregistrement le champ clé sera ajouté. Si ce paramètre n’est pas défini, le champ clé se trouve à la racine de l’enregistrement |
Key Format |
Spécifie comment représenter la clé d’enregistrement Kafka dans le FlowFile de sortie |
Key Record Reader |
Le Record Reader à utiliser pour analyser la clé d’enregistrement Kafka afin de la représenter sous la forme d’un enregistrement |
Max Uncommitted Time |
Spécifie la durée maximale pendant laquelle le processeur peut consommer à partir de Kafka avant de devoir transférer des FlowFiles dans le flux et valider les décalages auprès de Kafka (le cas échéant). Une période plus longue peut se traduire par une latence plus importante |
Message Demarcator |
Comme KafkaConsumer reçoit les messages par lots, ce processeur a une option pour générer en un seul lot les FlowFiles de sortie contenant tous les messages Kafka relatifs à un sujet et à une partition donnés, et cette propriété vous permet de fournir une chaîne (interprétée comme de l’UTF-8) à utiliser pour délimiter plusieurs messages Kafka. Il s’agit d’une propriété facultative. Si elle n’est pas fournie, chaque message Kafka reçu donnera lieu à un seul FlowFile à chaque déclenchement. Pour saisir un caractère spécial tel que « nouvelle ligne », utilisez CTRL+Entrée ou Maj+Entrée en fonction de l’OS |
Metadata Field |
Spécifie sous quel champ de l’enregistrement les métadonnées seront ajoutées. Si ce paramètre n’est pas fixé, les métadonnées se trouveront à la racine de l’enregistrement |
Metadata Received Timestamp Field |
Si cette propriété est définie, un horodatage sera placé sous le champ spécifié dans les métadonnées de l’enregistrement dans le FlowFile de sortie |
Output Strategy |
Le format utilisé pour convertir l’enregistrement Kafka en un enregistrement FlowFile. |
Processing Strategy |
Stratégie de traitement des enregistrements Kafka et d’écriture de la sortie sérialisée dans les FlowFiles |
Record Reader |
Le Record Reader à utiliser pour les messages Kafka entrants |
Record Writer |
Le Record Writer à utiliser pour sérialiser les FlowFiles sortants |
Separate By Key |
Lorsque cette propriété est activée, deux messages ne seront ajoutés au même FlowFile que si les deux messages Kafka ont des clés identiques. |
Topic Format |
Spécifie si les sujets fournis sont une liste de noms séparés par des virgules ou une expression régulière unique |
Chapitres |
Le nom ou le modèle des sujets Kafka à partir desquels le processeur consomme des enregistrements Kafka. Il est possible d’en fournir plusieurs s’ils sont séparés par des virgules. |
auto.offset.reset |
Configuration automatique des décalages appliquée lorsqu’aucun décalage de consommateur précédent n’a été trouvé, selon la propriété Kafka auto.offset.reset |
Relations¶
Nom |
Description |
---|---|
success |
FlowFiles contenant un ou plusieurs enregistrements Kafka sérialisés |
Écrit les attributs¶
Nom |
Description |
---|---|
record.count |
Le nombre d’enregistrements reçus |
mime.type |
Le type MIME fourni par le Record Writer configuré |
kafka.count |
Le nombre de messages écrits si plusieurs messages sont écrits |
kafka.key |
La clé du message si elle est présente et s’il s’agit d’un message unique. La façon dont la clé est encodée dépend de la valeur de la propriété « Key Attribute Encoding ». |
kafka.offset |
Le décalage du message dans la partition du sujet. |
kafka.timestamp |
L’horodatage du message dans la partition du sujet. |
kafka.partition |
La partition du sujet dont provient le message ou le bundle de messages |
kafka.topic |
Le sujet dont provient le message ou le bundle de messages |
kafka.tombstone |
Définissez ce paramètre sur TRUE si le message consommé est un message tombstone |