ConsumeKafka 2025.5.31.15

バンドル

com.snowflake.openflow.runtime | runtime-kafka-nar

説明

Apache Kafka Consumer API からメッセージを消費します。メッセージを送信するための補完的な NiFi プロセッサーは PublishKafka です。Processor は Kafka メッセージの消費をサポートし、オプションで NiFi 記録として解釈されます。この時点では(読み取り記録モードでは)、プロセッサーは指定されたパーティションから取得されるすべての記録が同じスキーマを持っていると仮定していることに注意してください。このモードでは、Kafka メッセージがプルされたにもかかわらず、構成されたRecord ReaderまたはRecord Writerで解析または書き込みができない場合、メッセージの内容は別の FlowFile に書き込まれ、その FlowFile は 'parse.failure' リレーションシップに転送されます。そうでない場合、各 FlowFile は'success'リレーションシップに送られ、1つの FlowFile の中に多くの個別メッセージが含まれる可能性があります。'record.count'属性が追加され、 FlowFile に含まれるメッセージの数を示します。スキーマが異なったり、<Headers to Add as Attributes> プロパティに含まれるメッセージヘッダーの値が異なったりすると、2 つの Kafka メッセージが同じ FlowFile に配置されることはありません。

タグ

avro, consume, csv, get, ingest, ingress, json, kafka, openflow, pubsub, record, topic

入力要件

FORBIDDEN

機密動的プロパティをサポート

false

プロパティ

プロパティ

説明

オフセット・コミット

メッセージ受信後に Kafka にオフセットをコミットするかどうかを指定します。通常、受信したメッセージが重複しないように、この値はtrueにセットされるべきです。しかし、特定のシナリオでは、オフセットのコミットを避け、データを処理し、後で PublishKafka、正確なOnceセマンティクスを提供するために確認することができます。

コンテンツ フィールド

記録のどのフィールドの基になるコンテンツを追加するかを指定します。セットされていない場合、内容は記録のルートになります。

グループ ID

Kafka group.idプロパティに対応するKafkaコンシューマーグループ識別子。

ヘッダー エンコーディング

Kafka 記録ヘッダー値の読み取りおよび FlowFile 属性の書き込み時に適用される文字エンコーディング。

ヘッダー 名 パターン

FlowFile 属性として記述するヘッダー値を選択するために、Kafka 記録のヘッダー名に適用される正規表現パターン。

ヘッダー フィールド 親

ヘッダーフィールドが記録のどのフィールドの基になるかを指定します。セットされていない場合、ヘッダーフィールドは記録のルートになります。

Kafka接続サービス

Kafka 記録を公開するための Kafka Broker への接続を提供します。

キー 属性 エンコーディング

Kafka 記録キーを含む構成 FlowFile 属性の値のエンコード。

キー フィールド 親

記録のどのフィールドの下にキーフィールドを追加するかを指定します。セットされていない場合、キーフィールドは記録のルートになります。

キー 形式

FlowFile 出力におけるKafka記録キーの表現方法を指定します。

キー Record Reader

Kafka 記録キーを記録にパースするために使用するRecord Reader。

最大 未約定 時間

プロセッサーが Kafka から FlowFiles をフローに転送し、オフセットを Kafka にコミットするまでに消費できる最大時間を指定します (適切な場合)。期間を長くすると、待ち時間が長くなります。

メッセージ デマケーター

KafkaConsumer はバッチでメッセージを受信するため、この Processor には、指定されたトピックとパーティションのすべての Kafka メッセージを 1 つのバッチにまとめた FlowFiles を出力するオプションがあります。このプロパティでは、複数の Kafka メッセージを区切るために使用する文字列 (UTF-8 と解釈されます) を指定できます。これはオプションのプロパティで、プロバイダーが指定されていない場合、受信したKafkaメッセージは、トリガーされるたびに FlowFile 。「改行」などの特殊文字を入力するには、 OS による CTRL +Enter、またはShift +Enterを使用します。

メタデータ フィールド

記録のどのフィールドの下にメタデータを追加するかを指定します。セットされていない場合、メタデータは記録のルートになります。

メタデータ受信タイムスタンプフィールド

指定された場合、タイムスタンプは出力 FlowFile の記録のメタデータの指定されたフィールドの基になります。

出力ストラテジー

Kafka レコードを FlowFile レコードに出力する際に使用する形式。

加工ストラテジー

Kafka 記録を処理し、シリアライズされた出力を FlowFiles に書き出すためのストラテジー。

Record Reader

Kafka メッセージの受信に使用するRecord Reader。

Record Writer

FlowFiles 送信をシリアライズするために使用するRecord Writer。

キーで分ける

このプロパティを有効にすると、Kafka メッセージの両方のキーが同じ場合にのみ、2 つのメッセージが同じ FlowFile に追加されます。

トピック形式

提供されるトピックがカンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。

トピック

Processor が Kafka 記録を消費する Kafka トピックの名前またはパターン。カンマ区切りなら複数指定可能。

auto.offset.reset

Kafka auto.offset.resetプロパティに対応する以前のコンシューマーオフセットが見つからない場合に適用される自動オフセット構成

リレーションシップ

名前

説明

success

FlowFiles Kafka記録を1つ以上シリアライズしたコンテナー

属性の書き込み

名前

説明

record.count

受信した記録数

mime.type

構成されたRecord Writerが提供する MIME タイプ。

kafka.count

複数のメッセージが書き込まれた場合、書き込まれたメッセージの数。

kafka.key

単一メッセージの場合はメッセージのキー。キーがどのようにエンコードされるかは、「Key Attribute Encoding」プロパティの値に依存します。

kafka.offset

トピックのパーティション内のメッセージのオフセット。

kafka.timestamp

トピックのパーティション内のメッセージのタイムスタンプ。

kafka.partition

メッセージまたはメッセージバンドルが属するトピックのパーティション。

kafka.topic

メッセージまたはメッセージバンドルからのトピック

kafka.tombstone

消費されたメッセージがtombstoneメッセージの場合、trueにセットします。

こちらもご覧ください