PublishKafka 2025.5.31.15¶
Bundle¶
com.snowflake.openflow.runtime | runtime-kafka-nar
Beschreibung¶
Sendet den Inhalt eines FlowFile entweder als Meldung oder als einzelne Datensätze an Apache Kafka unter Verwendung der Kafka Producer-API. Die zu sendenden Meldungen können einzelne FlowFiles sein, durch ein benutzerdefiniertes Trennzeichen (z. B. eine neue Zeile) getrennt sein oder datensatzorientierte Daten sein, die vom konfigurierten Record Reader gelesen werden können. Der komplementäre NiFi-Prozessor zum Abrufen von Meldungen ist ConsumeKafka.
Eingabeanforderung¶
REQUIRED
Unterstützt sensible dynamische Eigenschaften¶
false
Eigenschaften¶
Eigenschaft |
Beschreibung |
---|---|
Failure Strategy |
Legt fest, wie der Prozessor ein FlowFile behandelt, wenn er die Daten nicht in Kafka veröffentlichen kann. |
FlowFile Attribute Header Pattern |
Ein regulärer Ausdruck, der mit allen FlowFile-Attributnamen abgeglichen wird. Jedes Attribut, dessen Name mit dem Muster übereinstimmt, wird den Kafka-Meldungen als Header hinzugefügt. Wenn nicht angegeben, werden keine FlowFile-Attribute als Kopfzeilen hinzugefügt. |
Header Encoding |
Für jedes Attribut, das als Kafka-Datensatz-Header hinzugefügt wird, gibt diese Eigenschaft die Zeichencodierung an, die für die Serialisierung der Header verwendet werden soll. |
Kafka Connection Service |
Stellt Verbindungen zu Kafka Broker für die Veröffentlichung von Kafka-Datensätzen bereit. |
Kafka Key |
Der für die Meldung zu verwendende Schlüssel. Wenn nicht angegeben, wird das FlowFile-Attribut „kafka.key“ als Meldungsschlüssel verwendet, falls es vorhanden ist. Beachten Sie, dass das gleichzeitige Setzen des Kafka-Schlüssels und der Abgrenzung möglicherweise zu vielen Kafka-Meldungen mit demselben Schlüssel führen kann. Das gleichzeitige Festlegen von Trennzeichen und Kafka-Schlüssel kann jedoch zu Datenverlust in Kafka führen. Während einer Topic-Verdichtung in Kafka werden Meldungen auf Grundlage dieses Schlüssels dedupliziert. |
Kafka Key Attribute Encoding |
Ausgegebene FlowFiles verfügen über ein Attribut namens „kafka.key“. Diese Eigenschaft legt fest, wie der Wert des Attributs codiert werden soll. |
Message Demarcator |
Gibt die Zeichenkette (interpretiert als UTF-8) an, die zur Abgrenzung mehrerer Meldungen innerhalb eines einzigen FlowFile verwendet werden soll. Wenn nichts angegeben wird, wird der gesamte Inhalt des FlowFile als eine einzige Meldung verwendet. Falls angegeben, wird der Inhalt des FlowFile an diesem Trennzeichen aufgeteilt und jeder Abschnitt als separate Kafka-Meldung gesendet. Um Sonderzeichen wie „Zeilenumbruch“ einzugeben, verwenden Sie CTRL+Enter oder Shift+Enter, abhängig vom verwendeten OS. |
Message Key Field |
Der Name eines Feldes in den Eingabedatensätzen, das als Schlüssel für die Kafka-Meldung verwendet werden soll. |
Publish Strategy |
Das Format, das verwendet wird, um den eingehenden FlowFile-Datensatz in Kafka zu veröffentlichen. |
Record Key Writer |
Der für ausgehende FlowFiles zu verwendende Record Key Writer |
Record Metadata Strategy |
Gibt an, ob die Metadaten des Datensatzes (Topic und Partition) aus dem Metadatenfeld des Datensatzes stammen sollen oder aus den konfigurierten Eigenschaften „Topic Name“ und „Partition/Partitionierer Class“. |
Record Reader |
Der Record Reader, der für eingehende FlowFiles verwendet werden soll |
Record Writer |
Der Record Writer, der verwendet werden soll, um die Daten vor dem Senden an Kafka zu serialisieren |
Topic Name |
Name des Kafka-Topics, in das der Prozessor Kafka-Datensätze veröffentlicht. |
Transactional ID Prefix |
Legt fest, dass die KafkaProducer-Konfiguration „transactional.id“ eine generierte UUID ist, der die konfigurierte Zeichenfolge vorangestellt wird. |
Transactions Enabled |
Gibt an, ob beim Kommunizieren mit Kafka Transaktionsgarantien bereitgestellt werden sollen. Wenn ein Problem beim Senden von Daten an Kafka auftritt und diese Eigenschaft auf „false“ gesetzt ist, werden die Meldungen, die bereits an Kafka gesendet wurden, weiterhin an die Verbraucher zugestellt. Wenn dies auf „true“ gesetzt ist, wird die Kafka-Transaktion zurückgesetzt, sodass diese Meldungen den Verbrauchern nicht zur Verfügung stehen. Das Setzen dieses Werts auf „true“ erfordert, dass die Eigenschaft [Delivery Guarantee] auf [Guarantee Replicated Delivery] eingestellt ist. |
acks |
Legt die Anforderung fest, die garantiert, dass eine Meldung an Kafka gesendet wird. Entspricht der Eigenschaft „acks“ des Kafka-Clients. |
compression.type |
Legt die Komprimierungsstrategie für an Kafka gesendete Datensätze fest. Entspricht der Eigenschaft „compression.type“ des Kafka-Clients. |
max.request.size |
Die maximale Größe einer Anfrage in Bytes. Entspricht der Eigenschaft „max.request.size“ des Kafka-Clients. |
partition |
Gibt das Ziel der Kafka Partition für Datensätze an. |
partitioner.class |
Gibt an, welche Klasse für die Berechnung einer Partitions-ID für eine Meldung verwendet werden soll. Entspricht der Eigenschaft „partitioner.class“ des Kafka-Klients. |
Beziehungen¶
Name |
Beschreibung |
---|---|
failure |
Jedes FlowFile, das nicht an Kafka gesendet werden kann, wird an diese Beziehung weitergeleitet |
success |
FlowFiles, deren gesamter Inhalt an Kafka gesendet wurde. |
Schreibt Attribute¶
Name |
Beschreibung |
---|---|
msg.count |
Die Anzahl der Meldungen, die für dieses FlowFile an Kafka gesendet wurden. Dieses Attribut wird nur zu FlowFiles hinzugefügt, die an die Beziehung „success“ weitergeleitet werden. |