ConsumeKafka 2025.5.31.15¶
번들¶
com.snowflake.openflow.runtime | runtime-kafka-nar
설명¶
Apache Kafka Consumer API 의 메시지를 소비합니다. 메시지 전송을 위한 보완적인 NiFi 프로세서는 PublishKafka 입니다. 프로세서는 선택적으로 NiFi 레코드로 해석되는 Kafka 메시지 소비를 지원합니다. 이때(레코드 읽기 모드에서) 프로세서는 주어진 파티션에서 검색되는 모든 레코드가 동일한 스키마를 가지고 있다고 가정합니다. 이 모드의 경우, Kafka 메시지를 가져왔지만 구성된 Record Reader 또는 Record Writer로 구문 분석하거나 기록할 수 없는 경우 메시지 내용은 별도의 FlowFile 에 레코드되며, 해당 FlowFile 은 ‘parse.failure’ 관계로 전송됩니다. 그렇지 않으면, 각각의 FlowFile 은 ‘성공’ 관계로 전송되며 단일 FlowFile 내에 많은 개별 메시지를 포함할 수 있습니다. FlowFile 에 포함된 메시지 수를 나타내기 위해 ‘record.count’ 특성이 추가됩니다. 스키마가 다르거나 <Headers to Add as Attributes> 속성에 포함된 메시지 헤더의 값이 다른 경우 2개의 Kafka 메시지가 동일한 FlowFile 에 배치되지 않습니다.
입력 요구 사항¶
FORBIDDEN
민감한 동적 속성 지원¶
false
속성¶
속성 |
설명 |
---|---|
Commit Offsets |
이 프로세서가 메시지를 수신한 후 오프셋을 Kafka에 커밋할지 여부를 지정합니다. 일반적으로 이 값은 수신되는 메시지가 중복되지 않도록 true로 설정해야 합니다. 그러나 특정 시나리오에서는 오프셋을 커밋하지 않고 데이터를 처리한 후 나중에 PublishKafka 에서 확인하여 정확히 한 번 의미 체계를 제공하고자 할 수 있습니다. |
Content Field |
레코드의 어느 필드 아래에 내용을 추가할지 지정합니다. 설정하지 않으면 내용이 레코드의 루트로 설정됩니다. |
그룹 ID |
Kafka group.id 속성에 해당하는 Kafka 컨슈머 그룹 식별자 |
Header Encoding |
Kafka 레코드 헤더 값을 읽고 FlowFile 특성을 쓸 때 문자 인코딩이 적용됩니다 |
Header Name Pattern |
FlowFile 특성으로 쓸 헤더 값을 선택하기 위한 정규식 패턴이 Kafka 레코드 헤더 이름에 적용되었습니다 |
Headers Field Parent |
레코드의 어느 필드 아래에 헤더 필드를 추가할지 지정합니다. 설정하지 않으면 헤더 필드가 레코드의 루트에 위치합니다. |
Kafka Connection Service |
Kafka 레코드를 게시하기 위한 Kafka Broker에 대한 연결을 제공합니다 |
Key Attribute Encoding |
Kafka 레코드 키가 포함된 구성된 FlowFile 특성의 값에 대한 인코딩입니다. |
Key Field Parent |
레코드의 어느 필드 아래에 키 필드를 추가할지 지정합니다. 설정하지 않으면 키 필드는 레코드의 루트에 위치합니다. |
Key Format |
FlowFile 출력에서 Kafka 레코드 키를 표현하는 방법을 지정합니다. |
Key Record Reader |
Kafka 레코드 키를 레코드로 구문 분석하는 데 사용할 Record Reader |
Max Uncommitted Time |
프로세서가 플로우를 통해 FlowFiles 을 전송하고 (적절한 경우) 오프셋을 Kafka에 커밋해야 하기 전에 Kafka에서 소비할 수 있는 최대 시간을 지정합니다. 기간이 길면 지연 시간이 길어질 수 있습니다 |
Message Demarcator |
KafkaConsumer 는 메시지를 일괄적으로 수신하므로 이 프로세서에는 주어진 항목과 파티션에 대해 모든 Kafka 메시지를 단일 배치로 포함하는 FlowFiles 을 출력하는 옵션이 있으며, 이 속성을 사용하면 여러 Kafka 메시지를 구분하는 데 사용할 문자열(UTF-8로 해석됨)을 제공할 수 있습니다. 이 속성은 선택적 속성이며 제공하지 않으면 수신되는 각 Kafka 메시지가 단일 FlowFile 로 트리거됩니다. ‘new line’과 같은 특수 문자를 입력하려면 OS 에 따라 CTRL+Enter 또는 Shift+Enter를 사용하십시오. |
Metadata Field |
레코드의 어느 필드 아래에 메타데이터를 추가할지 지정합니다. 설정하지 않으면 메타데이터는 레코드의 루트에 위치합니다. |
Metadata Received Timestamp Field |
지정하면 타임스탬프는 FlowFile 출력의 레코드 메타데이터에서 지정된 필드 아래에 배치됩니다. |
Output Strategy |
Kafka 레코드를 FlowFile 레코드로 출력하는 데 사용되는 형식입니다. |
Processing Strategy |
Kafka 레코드 처리 및 직렬화된 출력을 FlowFiles 에 쓰기 전략 |
Record Reader |
수신 Kafka 메시지에 사용할 Record Reader |
Record Writer |
전송 FlowFiles 을 직렬화하기 위해 사용할 Record Writer입니다. |
Separate By Key |
이 속성을 활성화하면 두 Kafka 메시지의 키가 모두 동일한 경우에만 두 메시지가 동일한 FlowFile 에 추가됩니다. |
Topic Format |
제공된 항목이 쉼표로 구분된 이름 목록인지 아니면 단일 정규식인지 지정합니다 |
항목 |
프로세서가 Kafka 레코드를 소비하는 Kafka 항목의 이름 또는 패턴입니다. 쉼표로 구분된 경우 둘 이상을 입력할 수 있습니다. |
auto.offset.reset |
Kafka auto.offset.reset 속성에 해당하는 이전 컨슈머 오프셋이 없는 경우 자동 오프셋 구성 적용 |
관계¶
이름 |
설명 |
---|---|
성공 |
1개 이상의 직렬화된 Kafka 레코드를 포함하는 FlowFiles |
Writes 특성¶
이름 |
설명 |
---|---|
record.count |
수신된 레코드 수 |
mime.type |
구성된 Record Writer에서 제공하는 MIME 유형입니다 |
kafka.count |
둘 이상의 메시지가 작성된 경우 작성된 메시지 수입니다 |
kafka.key |
메시지가 있는 경우 및 단일 메시지인 경우 메시지의 키입니다. 키가 인코딩되는 방법은 ‘Key Attribute Encoding’ 속성의 값에 따라 달라집니다. |
kafka.offset |
항목의 파티션에 있는 메시지의 오프셋입니다. |
kafka.timestamp |
항목의 파티션에 있는 메시지의 타임스탬프입니다. |
kafka.partition |
메시지 또는 메시지 번들의 출처인 항목의 파티션입니다 |
kafka.topic |
메시지 또는 메시지 번들이 보낸 항목 |
kafka.tombstone |
소비된 메시지가 툼스톤 메시지인 경우 true로 설정합니다 |