PublishKafka 2025.5.31.15

バンドル

com.snowflake.openflow.runtime | runtime-kafka-nar

説明

Kafka Producer API を使用して、 FlowFile のコンテンツをメッセージまたは個々の記録として Apache Kafka に送信します。送信するメッセージは、個々の FlowFiles、ユーザー指定の区切り文字(改行など)で区切られていてもよいし、構成されたRecord Readerで読み取れる記録指向のデータであってもよいです。メッセージをフェッチするための補完的な NiFi プロセッサーは ConsumeKafka です。

タグ

apache, avro, csv, json, kafka, logs, message, openflow, pubsub, put, record, send

入力要件

REQUIRED

機密動的プロパティをサポート

false

プロパティ

プロパティ

説明

失敗ストラテジー

Kafka にデータをパブリッシュできない場合に、プロセッサーが FlowFile をどのように処理するかを指定します。

FlowFile 属性ヘッダーパターン

すべての FlowFile 属性名にマッチする正規表現。このパターンに一致する属性名は、KafkaメッセージにHeaderとして追加されます。指定しない場合、 FlowFile 属性はヘッダーとして追加されません。

ヘッダー エンコーディング

Kafka 記録ヘッダーとして追加される属性について、このプロパティはヘッダーのシリアライズに使用する文字エンコーディングを示します。

Kafka接続サービス

Kafka 記録を公開するための Kafka Broker への接続を提供します。

Kafkaキー

メッセージに使用するキー。指定しない場合、 FlowFile 属性「kafka.key」が存在すれば、それがメッセージキーとして使用されます。Kafka キーのセットとデマケーションを同時に行うと、同じキーを持つ Kafka メッセージが多数存在する可能性があることに注意してください。Kafka はメッセージとキーの一意性を強制したり仮定したりしないため、通常は問題ありません。それでも、デマケーターとKafkaキーを同時にセットすると、Kafkaのデータが失われる危険性があります。Kafkaのトピック コンパクションでは、メッセージはこのキーに基づいて重複排除されます。

Kafkaキー属性エンコーディング

「kafka.key」という属性がある FlowFiles。このプロパティは、属性の値がどのようにエンコードされるべきかを指示します。

メッセージ デマケーター

一つの FlowFile 内で複数のメッセージを区切るために使う文字列 (UTF-8 と解釈されます) を指定します。指定しない場合、 FlowFile の内容全体が1つのメッセージとして使用されます。指定すると、 FlowFile の内容がこの区切り文字で分割され、各セクションが個別の Kafka メッセージとして送信されます。「改行」などの特殊文字を入力するには、 OS によって CTRL+Enter、またはShift +Enterを使用します。

メッセージキーフィールド

Kafkaメッセージのキーとして使用する入力記録のフィールド名。

公開ストラテジー

受信した FlowFile 記録を Kafka に公開する際に使用する形式。

記録キー・ライター

発信 FlowFiles に使用する記録キーライター

記録メタデータストラテジー

記録のメタデータ (トピックとパーティション) を記録のメタデータフィールドから取得するか、構成されたトピック名とパーティション/パーティショナクラスのプロパティから取得するかを指定します。

Record Reader

受信 FlowFiles に使用するRecord Reader

Record Writer

Kafka に送信する前にデータをシリアライズするために使用する Record Writer。

トピック名

Processor が Kafka記録を発行する Kafka トピックの名前。

トランザクション ID プレフィックス

KafkaProducer config transactional.idが生成された UUID になることと設定された文字列がプレフィックスされることを指定します。

トランザクションの有効化

Kafka との通信時にトランザクション保証を提供するかどうかを指定します。Kafka へのデータ送信に問題があり、このプロパティが false にセットされている場合、Kafka に送信済みのメッセージは続行され、コンシューマーに配信されます。これをtrueにセットすると、Kafkaトランザクションはロールバックされ、コンシューマーはそれらのメッセージを利用できなくなります。これをtrueに設定するには、 [Delivery Guarantee]プロパティを [Guarantee Replicated Delivery.]にセットする必要があります。

acks

メッセージが Kafka に送信されることを保証するための要件を指定します。Kafka Client acks プロパティに対応します。

compression.type

Kafka に送信する記録の圧縮ストラテジーを指定します。Kafka Client compression.typeプロパティに対応します。

max.request.size

リクエストの最大サイズ(バイト)。Kafka Client max.request.sizeプロパティに対応します。

partition

記録のKafkaパーティション先を指定します。

パーティショナー・クラス

メッセージのパーティション ID を計算するために使用するクラスを指定します。Kafka Client partitioner.classプロパティに対応します。

リレーションシップ

名前

説明

failure

Kafka に送信できない FlowFile は、この Relationship にルーティングされます。

success

すべてのコンテンツが Kafka に送信された FlowFiles。

属性の書き込み

名前

説明

msg.count

この FlowFile に対して Kafka に送信されたメッセージの数。この属性は、サクセスにルーティングされる FlowFiles にのみ付加されます。

こちらもご覧ください