ConsumeKafka 2025.5.31.15¶
Bundle¶
com.snowflake.openflow.runtime | runtime-kafka-nar
Beschreibung¶
Verarbeitet Meldungen von der Apache Kafka Consumer-API. Der komplementäre NiFi-Prozessor zum Senden von Meldungen ist PublishKafka. Der Prozessor unterstützt die Verarbeitung von Kafka-Meldungen, die optional als NiFi-Datensätze interpretiert werden können. Bitte beachten Sie, dass der Prozessor zu diesem Zeitpunkt (im Lesemodus) davon ausgeht, dass alle Datensätze, die aus einer bestimmten Partition abgerufen werden, dasselbe Schema haben. Wenn in diesem Modus eine Kafka-Meldung zwar abgerufen, aber nicht erfolgreich mit dem konfigurierten Record Reader oder Record Writer verarbeitet werden kann, wird ihr Inhalt in ein separates FlowFile geschrieben. Dieses FlowFile wird anschließend der Beziehung „parse.failure“ zugewiesen. Andernfalls wird jedes FlowFile an die Beziehung „success“ gesendet und kann viele einzelne Meldungen innerhalb des einzelnen FlowFile enthalten. Es wird ein Attribut „record.count“ hinzugefügt, das angibt, wie viele Meldungen im FlowFile enthalten sind. Zwei Kafka-Meldungen werden nicht in selben FlowFile abgelegt, wenn sie unterschiedliche Schemas haben oder wenn sie unterschiedliche Werte für einen Meldungsheader haben, der in der Eigenschaft <Headers to Add as Attributes> enthalten ist.
Eingabeanforderung¶
FORBIDDEN
Unterstützt sensible dynamische Eigenschaften¶
false
Eigenschaften¶
Eigenschaft |
Beschreibung |
---|---|
Commit Offsets |
Gibt an, ob dieser Prozessor die Offsets nach dem Empfang von Meldungen an Kafka bestätigen soll. Normalerweise sollte dieser Wert auf „true“ gesetzt werden, damit empfangene Meldungen nicht dupliziert werden. In bestimmten Szenarien kann es jedoch sinnvoll sein, die Offsets nicht zu bestätigen, damit die Daten verarbeitet und später von PublishKafka bestätigt werden können, um eine „Exactly Once“-Semantik zu gewährleisten. |
Content Field |
Gibt an, unter welchem Feld des Datensatzes der Inhalt hinzugefügt werden soll. Wenn diese Option nicht gesetzt ist, befindet sich der Inhalt in der Wurzel des Datensatzes |
Gruppen-ID |
Bezeichner Kafka-Verbrauchergruppen, der der Kafka-Eigenschaft „group.id“ entspricht |
Header Encoding |
Zeichencodierung, die beim Lesen der Header-Werte eines Kafka-Datensatzes und beim Schreiben von FlowFile-Attributen angewendet wird |
Header Name Pattern |
Reguläres Ausdrucksmuster, das auf die Header-Namen von Kafka-Datensäten angewendet wird, um Header-Werte auszuwählen, die als FlowFile-Attribute geschrieben werden sollen. |
Headers Field Parent |
Gibt an, unter welchem Feld des Datensatzes das Feld „Headers“ hinzugefügt werden soll. Wenn nicht festgelegt, befindet sich das Feld „Headers“ auf der obersten Ebene des Datensatzes. |
Kafka Connection Service |
Stellt Verbindungen zu Kafka Broker für die Veröffentlichung von Kafka-Datensätzen bereit. |
Key Attribute Encoding |
Codierung für den Wert des konfigurierten FlowFile-Attributs, das den Kafka-Datensatzschlüssel enthält. |
Key Field Parent |
Gibt an, unter welchem Feld des Datensatzes das Schlüsselfeld hinzugefügt werden soll. Wenn nicht festgelegt, befindet sich das Schlüsselfeld auf der obersten Ebene des Datensatzes |
Key Format |
Legt fest, wie der Kafka-Datensatzschlüssel im Ausgabe-FlowFile dargestellt werden soll |
Key Record Reader |
Der Record Reader, der zum Einlesen des Kafka-Datensatzschlüssels in einen Datensatz verwendet wird |
Max Uncommitted Time |
Gibt die maximale Zeitspanne an, während der der Prozessor Daten aus Kafka verarbeiten darf, bevor er die FlowFiles im Ablauf weiterleiten und – falls erforderlich – die Offsets an Kafka bestätigen muss. Eine größere Zeitspanne kann zu einer längeren Latenzzeit führen. |
Message Demarcator |
Da KafkaConsumer Meldungen in Batches empfängt, verfügt dieser Prozessor über eine Option zur Ausgabe von FlowFiles, die alle Kafka-Meldungen für ein bestimmtes Thema und eine bestimmte Partition in einem einzigen Batch enthalten. Mit dieser Eigenschaft können Sie eine Zeichenfolge (interpretiert als UTF-8) angeben, die zur Abgrenzung mehrerer Kafka-Meldungen verwendet wird. Dies ist eine optionale Eigenschaft. Wenn sie nicht angegeben wird, resultiert jede empfangene Kafka-Meldung zu einem einzelnen FlowFile zum jeweiligen Auslösezeitpunkt. Zur Eingabe von Sonderzeichen wie z. B. einen Zeilenumbruch verwenden Sie CTRL+Enter oder Shift+Enter, je nach OS |
Metadata Field |
Gibt an, unter welchem Feld des Datensatzes die Metadaten hinzugefügt werden sollen. Wenn nicht festgelegt, befinden sich die Metadaten auf der obersten Ebene des Datensatzes. |
Metadata Received Timestamp Field |
Wenn angegeben, wird ein Zeitstempel unter dem angegebenen Feld in den Metadaten des Datensatzes im Ausgabe-FlowFile abgelegt. |
Output Strategy |
Das Format, das zur Ausgabe des Kafka-Datensatzes in einen FlowFile-Datensatz verwendet wird. |
Processing Strategy |
Strategie für die Verarbeitung von Kafka-Datensätzen und das Schreiben der serialisierten Ausgabe in dieFlowFiles |
Record Reader |
Der Record Reader, der für eingehende Kafka-Meldungen verwendet werden soll |
Record Writer |
Der Record Writer, der für die Serialisierung der ausgehenden FlowFiles verwendet werden soll |
Separate By Key |
Wenn diese Eigenschaft aktiviert ist, werden zwei Meldungen nur dann zum selben FlowFile hinzugefügt, wenn beide Kafka-Meldungen identische Schlüssel haben. |
Topic Format |
Gibt an, ob die angegebenen Themen eine durch Kommas getrennte Liste von Namen oder ein einzelner regulärer Ausdruck sind |
Themen |
Der Name oder das Muster der Kafka-Themen, aus denen der Prozessor Kafka-Datensätze verarbeitet. Es können auch mehrere angegeben werden, wenn sie durch Kommas getrennt sind. |
auto.offset.reset |
Automatische Offset-Konfiguration, die angewendet wird, wenn kein vorheriger Verbraucher-Offset gefunden wurde, der der Eigenschaft „Kafka auto.offset.reset“ entspricht. |
Beziehungen¶
Name |
Beschreibung |
---|---|
success |
FlowFiles mit einem oder mehreren serialisierten Kafka-Datensätzen |
Schreibt Attribute¶
Name |
Beschreibung |
---|---|
record.count |
Die Anzahl der empfangenen Datensätze |
mime.type |
Der MIME-Typ, der von dem konfigurierten Record Writer bereitgestellt wird |
kafka.count |
Die Anzahl der geschriebenen Meldungen, wenn mehr als eine |
kafka.key |
Der Schlüssel der Meldung, falls vorhanden und falls es sich um eine einzelne Meldung handelt. Wie der Schlüssel codiert wird, hängt von dem Wert der Eigenschaft „Key Attribute Encoding“ ab. |
kafka.offset |
Der Offset der Meldung in der Partition des Themas. |
kafka.timestamp |
Der Zeitstempel der Meldung in der Partition des Themas. |
kafka.partition |
Die Partition des Themas, aus der die Meldung oder das Meldungs-Bundle stammt |
kafka.topic |
Das Thema, aus dem die Meldung oder das Meldungs-Bundle stammt |
kafka.tombstone |
Auf „true“ setzen, wenn die empfangene Meldung eine Tombstone-Meldung ist |