ConsumeKafka 2025.5.31.15¶
バンドル¶
com.snowflake.openflow.runtime | runtime-kafka-nar
説明¶
Apache Kafka Consumer API からメッセージを消費します。メッセージを送信するための補完的な NiFi プロセッサーは PublishKafka です。Processor は Kafka メッセージの消費をサポートし、オプションで NiFi 記録として解釈されます。この時点では(読み取り記録モードでは)、プロセッサーは指定されたパーティションから取得されるすべての記録が同じスキーマを持っていると仮定していることに注意してください。このモードでは、Kafka メッセージがプルされたにもかかわらず、構成されたRecord ReaderまたはRecord Writerで解析または書き込みができない場合、メッセージの内容は別の FlowFile に書き込まれ、その FlowFile は 'parse.failure' リレーションシップに転送されます。そうでない場合、各 FlowFile は'success'リレーションシップに送られ、1つの FlowFile の中に多くの個別メッセージが含まれる可能性があります。'record.count'属性が追加され、 FlowFile に含まれるメッセージの数を示します。スキーマが異なったり、<Headers to Add as Attributes> プロパティに含まれるメッセージヘッダーの値が異なったりすると、2 つの Kafka メッセージが同じ FlowFile に配置されることはありません。
入力要件¶
FORBIDDEN
機密動的プロパティをサポート¶
false
プロパティ¶
プロパティ |
説明 |
---|---|
オフセット・コミット |
メッセージ受信後に Kafka にオフセットをコミットするかどうかを指定します。通常、受信したメッセージが重複しないように、この値はtrueにセットされるべきです。しかし、特定のシナリオでは、オフセットのコミットを避け、データを処理し、後で PublishKafka、正確なOnceセマンティクスを提供するために確認することができます。 |
コンテンツ フィールド |
記録のどのフィールドの基になるコンテンツを追加するかを指定します。セットされていない場合、内容は記録のルートになります。 |
グループ ID |
Kafka group.idプロパティに対応するKafkaコンシューマーグループ識別子。 |
ヘッダー エンコーディング |
Kafka 記録ヘッダー値の読み取りおよび FlowFile 属性の書き込み時に適用される文字エンコーディング。 |
ヘッダー 名 パターン |
FlowFile 属性として記述するヘッダー値を選択するために、Kafka 記録のヘッダー名に適用される正規表現パターン。 |
ヘッダー フィールド 親 |
ヘッダーフィールドが記録のどのフィールドの基になるかを指定します。セットされていない場合、ヘッダーフィールドは記録のルートになります。 |
Kafka接続サービス |
Kafka 記録を公開するための Kafka Broker への接続を提供します。 |
キー 属性 エンコーディング |
Kafka 記録キーを含む構成 FlowFile 属性の値のエンコード。 |
キー フィールド 親 |
記録のどのフィールドの下にキーフィールドを追加するかを指定します。セットされていない場合、キーフィールドは記録のルートになります。 |
キー 形式 |
FlowFile 出力におけるKafka記録キーの表現方法を指定します。 |
キー Record Reader |
Kafka 記録キーを記録にパースするために使用するRecord Reader。 |
最大 未約定 時間 |
プロセッサーが Kafka から FlowFiles をフローに転送し、オフセットを Kafka にコミットするまでに消費できる最大時間を指定します (適切な場合)。期間を長くすると、待ち時間が長くなります。 |
メッセージ デマケーター |
KafkaConsumer はバッチでメッセージを受信するため、この Processor には、指定されたトピックとパーティションのすべての Kafka メッセージを 1 つのバッチにまとめた FlowFiles を出力するオプションがあります。このプロパティでは、複数の Kafka メッセージを区切るために使用する文字列 (UTF-8 と解釈されます) を指定できます。これはオプションのプロパティで、プロバイダーが指定されていない場合、受信したKafkaメッセージは、トリガーされるたびに FlowFile 。「改行」などの特殊文字を入力するには、 OS による CTRL +Enter、またはShift +Enterを使用します。 |
メタデータ フィールド |
記録のどのフィールドの下にメタデータを追加するかを指定します。セットされていない場合、メタデータは記録のルートになります。 |
メタデータ受信タイムスタンプフィールド |
指定された場合、タイムスタンプは出力 FlowFile の記録のメタデータの指定されたフィールドの基になります。 |
出力ストラテジー |
Kafka レコードを FlowFile レコードに出力する際に使用する形式。 |
加工ストラテジー |
Kafka 記録を処理し、シリアライズされた出力を FlowFiles に書き出すためのストラテジー。 |
Record Reader |
Kafka メッセージの受信に使用するRecord Reader。 |
Record Writer |
FlowFiles 送信をシリアライズするために使用するRecord Writer。 |
キーで分ける |
このプロパティを有効にすると、Kafka メッセージの両方のキーが同じ場合にのみ、2 つのメッセージが同じ FlowFile に追加されます。 |
トピック形式 |
提供されるトピックがカンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。 |
トピック |
Processor が Kafka 記録を消費する Kafka トピックの名前またはパターン。カンマ区切りなら複数指定可能。 |
auto.offset.reset |
Kafka auto.offset.resetプロパティに対応する以前のコンシューマーオフセットが見つからない場合に適用される自動オフセット構成 |
リレーションシップ¶
名前 |
説明 |
---|---|
success |
FlowFiles Kafka記録を1つ以上シリアライズしたコンテナー |
属性の書き込み¶
名前 |
説明 |
---|---|
record.count |
受信した記録数 |
mime.type |
構成されたRecord Writerが提供する MIME タイプ。 |
kafka.count |
複数のメッセージが書き込まれた場合、書き込まれたメッセージの数。 |
kafka.key |
単一メッセージの場合はメッセージのキー。キーがどのようにエンコードされるかは、「Key Attribute Encoding」プロパティの値に依存します。 |
kafka.offset |
トピックのパーティション内のメッセージのオフセット。 |
kafka.timestamp |
トピックのパーティション内のメッセージのタイムスタンプ。 |
kafka.partition |
メッセージまたはメッセージバンドルが属するトピックのパーティション。 |
kafka.topic |
メッセージまたはメッセージバンドルからのトピック |
kafka.tombstone |
消費されたメッセージがtombstoneメッセージの場合、trueにセットします。 |