ConsumeKafka 2025.5.31.15

Bundle

com.snowflake.openflow.runtime | runtime-kafka-nar

Beschreibung

Verarbeitet Meldungen von der Apache Kafka Consumer-API. Der komplementäre NiFi-Prozessor zum Senden von Meldungen ist PublishKafka. Der Prozessor unterstützt die Verarbeitung von Kafka-Meldungen, die optional als NiFi-Datensätze interpretiert werden können. Bitte beachten Sie, dass der Prozessor zu diesem Zeitpunkt (im Lesemodus) davon ausgeht, dass alle Datensätze, die aus einer bestimmten Partition abgerufen werden, dasselbe Schema haben. Wenn in diesem Modus eine Kafka-Meldung zwar abgerufen, aber nicht erfolgreich mit dem konfigurierten Record Reader oder Record Writer verarbeitet werden kann, wird ihr Inhalt in ein separates FlowFile geschrieben. Dieses FlowFile wird anschließend der Beziehung „parse.failure“ zugewiesen. Andernfalls wird jedes FlowFile an die Beziehung „success“ gesendet und kann viele einzelne Meldungen innerhalb des einzelnen FlowFile enthalten. Es wird ein Attribut „record.count“ hinzugefügt, das angibt, wie viele Meldungen im FlowFile enthalten sind. Zwei Kafka-Meldungen werden nicht in selben FlowFile abgelegt, wenn sie unterschiedliche Schemas haben oder wenn sie unterschiedliche Werte für einen Meldungsheader haben, der in der Eigenschaft <Headers to Add as Attributes> enthalten ist.

Tags

avro, consume, csv, get, ingest, ingress, json, kafka, openflow, pubsub, record, topic

Eingabeanforderung

FORBIDDEN

Unterstützt sensible dynamische Eigenschaften

false

Eigenschaften

Eigenschaft

Beschreibung

Commit Offsets

Gibt an, ob dieser Prozessor die Offsets nach dem Empfang von Meldungen an Kafka bestätigen soll. Normalerweise sollte dieser Wert auf „true“ gesetzt werden, damit empfangene Meldungen nicht dupliziert werden. In bestimmten Szenarien kann es jedoch sinnvoll sein, die Offsets nicht zu bestätigen, damit die Daten verarbeitet und später von PublishKafka bestätigt werden können, um eine „Exactly Once“-Semantik zu gewährleisten.

Content Field

Gibt an, unter welchem Feld des Datensatzes der Inhalt hinzugefügt werden soll. Wenn diese Option nicht gesetzt ist, befindet sich der Inhalt in der Wurzel des Datensatzes

Gruppen-ID

Bezeichner Kafka-Verbrauchergruppen, der der Kafka-Eigenschaft „group.id“ entspricht

Header Encoding

Zeichencodierung, die beim Lesen der Header-Werte eines Kafka-Datensatzes und beim Schreiben von FlowFile-Attributen angewendet wird

Header Name Pattern

Reguläres Ausdrucksmuster, das auf die Header-Namen von Kafka-Datensäten angewendet wird, um Header-Werte auszuwählen, die als FlowFile-Attribute geschrieben werden sollen.

Headers Field Parent

Gibt an, unter welchem Feld des Datensatzes das Feld „Headers“ hinzugefügt werden soll. Wenn nicht festgelegt, befindet sich das Feld „Headers“ auf der obersten Ebene des Datensatzes.

Kafka Connection Service

Stellt Verbindungen zu Kafka Broker für die Veröffentlichung von Kafka-Datensätzen bereit.

Key Attribute Encoding

Codierung für den Wert des konfigurierten FlowFile-Attributs, das den Kafka-Datensatzschlüssel enthält.

Key Field Parent

Gibt an, unter welchem Feld des Datensatzes das Schlüsselfeld hinzugefügt werden soll. Wenn nicht festgelegt, befindet sich das Schlüsselfeld auf der obersten Ebene des Datensatzes

Key Format

Legt fest, wie der Kafka-Datensatzschlüssel im Ausgabe-FlowFile dargestellt werden soll

Key Record Reader

Der Record Reader, der zum Einlesen des Kafka-Datensatzschlüssels in einen Datensatz verwendet wird

Max Uncommitted Time

Gibt die maximale Zeitspanne an, während der der Prozessor Daten aus Kafka verarbeiten darf, bevor er die FlowFiles im Ablauf weiterleiten und – falls erforderlich – die Offsets an Kafka bestätigen muss. Eine größere Zeitspanne kann zu einer längeren Latenzzeit führen.

Message Demarcator

Da KafkaConsumer Meldungen in Batches empfängt, verfügt dieser Prozessor über eine Option zur Ausgabe von FlowFiles, die alle Kafka-Meldungen für ein bestimmtes Thema und eine bestimmte Partition in einem einzigen Batch enthalten. Mit dieser Eigenschaft können Sie eine Zeichenfolge (interpretiert als UTF-8) angeben, die zur Abgrenzung mehrerer Kafka-Meldungen verwendet wird. Dies ist eine optionale Eigenschaft. Wenn sie nicht angegeben wird, resultiert jede empfangene Kafka-Meldung zu einem einzelnen FlowFile zum jeweiligen Auslösezeitpunkt. Zur Eingabe von Sonderzeichen wie z. B. einen Zeilenumbruch verwenden Sie CTRL+Enter oder Shift+Enter, je nach OS

Metadata Field

Gibt an, unter welchem Feld des Datensatzes die Metadaten hinzugefügt werden sollen. Wenn nicht festgelegt, befinden sich die Metadaten auf der obersten Ebene des Datensatzes.

Metadata Received Timestamp Field

Wenn angegeben, wird ein Zeitstempel unter dem angegebenen Feld in den Metadaten des Datensatzes im Ausgabe-FlowFile abgelegt.

Output Strategy

Das Format, das zur Ausgabe des Kafka-Datensatzes in einen FlowFile-Datensatz verwendet wird.

Processing Strategy

Strategie für die Verarbeitung von Kafka-Datensätzen und das Schreiben der serialisierten Ausgabe in dieFlowFiles

Record Reader

Der Record Reader, der für eingehende Kafka-Meldungen verwendet werden soll

Record Writer

Der Record Writer, der für die Serialisierung der ausgehenden FlowFiles verwendet werden soll

Separate By Key

Wenn diese Eigenschaft aktiviert ist, werden zwei Meldungen nur dann zum selben FlowFile hinzugefügt, wenn beide Kafka-Meldungen identische Schlüssel haben.

Topic Format

Gibt an, ob die angegebenen Themen eine durch Kommas getrennte Liste von Namen oder ein einzelner regulärer Ausdruck sind

Themen

Der Name oder das Muster der Kafka-Themen, aus denen der Prozessor Kafka-Datensätze verarbeitet. Es können auch mehrere angegeben werden, wenn sie durch Kommas getrennt sind.

auto.offset.reset

Automatische Offset-Konfiguration, die angewendet wird, wenn kein vorheriger Verbraucher-Offset gefunden wurde, der der Eigenschaft „Kafka auto.offset.reset“ entspricht.

Beziehungen

Name

Beschreibung

success

FlowFiles mit einem oder mehreren serialisierten Kafka-Datensätzen

Schreibt Attribute

Name

Beschreibung

record.count

Die Anzahl der empfangenen Datensätze

mime.type

Der MIME-Typ, der von dem konfigurierten Record Writer bereitgestellt wird

kafka.count

Die Anzahl der geschriebenen Meldungen, wenn mehr als eine

kafka.key

Der Schlüssel der Meldung, falls vorhanden und falls es sich um eine einzelne Meldung handelt. Wie der Schlüssel codiert wird, hängt von dem Wert der Eigenschaft „Key Attribute Encoding“ ab.

kafka.offset

Der Offset der Meldung in der Partition des Themas.

kafka.timestamp

Der Zeitstempel der Meldung in der Partition des Themas.

kafka.partition

Die Partition des Themas, aus der die Meldung oder das Meldungs-Bundle stammt

kafka.topic

Das Thema, aus dem die Meldung oder das Meldungs-Bundle stammt

kafka.tombstone

Auf „true“ setzen, wenn die empfangene Meldung eine Tombstone-Meldung ist

Siehe auch: