PublishKafka 2025.5.31.15¶
번들¶
com.snowflake.openflow.runtime | runtime-kafka-nar
설명¶
FlowFile 의 내용을 메시지 또는 개별 레코드로서 Kafka Producer API 를 사용하여 Apache Kafka로 전송합니다. 전송할 메시지는 개별 FlowFiles 일 수도 있고, 사용자가 지정한 구분 기호(예: 새 줄)를 사용하여 구분할 수도 있으며, 구성된 Record Reader에서 읽을 수 있는 레코드 지향 데이터일 수도 있습니다. 메시지를 가져오기 위한 보완적인 NiFi 프로세서는 ConsumeKafka 입니다.
입력 요구 사항¶
REQUIRED
민감한 동적 속성 지원¶
false
속성¶
속성 |
설명 |
---|---|
Failure Strategy |
프로세서가 데이터를 Kafka에 게시할 수 없는 경우 FlowFile 을 처리하는 방법을 지정합니다 |
FlowFile 특성 헤더 패턴 |
모든 FlowFile 특성 이름에 대해 일치하는 정규식입니다. 이름이 패턴과 일치하는 모든 특성은 헤더로 Kafka 메시지에 추가됩니다. 지정하지 않으면 FlowFile 특성이 헤더로 추가되지 않습니다. |
Header Encoding |
Kafka Record Header로 추가되는 특성의 경우 이 속성은 헤더를 직렬화할 때 사용할 문자 인코딩을 나타냅니다. |
Kafka Connection Service |
Kafka 레코드를 게시하기 위한 Kafka Broker에 대한 연결을 제공합니다 |
Kafka 키 |
메시지에 사용할 키입니다. 지정하지 않으면 FlowFile 특성이 있는 경우 ‘kafka.key’가 메시지 키로 사용됩니다. Kafka 키를 설정하고 동시에 경계를 지정하면 잠재적으로 동일한 키를 가진 많은 Kafka 메시지가 발생할 수 있습니다. 일반적으로 Kafka는 메시지 및 키 고유성을 강제하거나 가정하지 않으므로 이는 문제가 되지 않습니다. 하지만 구분 기호과 Kafka 키를 동시에 설정하면 Kafka에서 데이터가 손실될 위험이 있습니다. Kafka에서 항목 압축을 하는 동안 이 키를 기준으로 메시지가 중복 제거됩니다. |
Kafka 키 특성 인코딩 |
FlowFiles 은 ‘kafka.key’라는 특성을 가집니다. 이 속성은 특성 값을 인코딩하는 방법을 지정합니다. |
Message Demarcator |
단일 FlowFile 내에서 여러 메시지를 구분하는 데 사용할 문자열(UTF-8로 해석됨)을 지정합니다. 지정하지 않으면 FlowFile 의 전체 내용이 1개의 메시지로 사용됩니다. 지정하면 FlowFile 의 내용이 이 구분 기호로 분할되어 각 섹션이 별도의 Kafka 메시지로 전송됩니다. ‘new line’과 같은 특수 문자를 입력하려면 OS 에 따라 CTRL+Enter 또는 Shift+Enter를 사용하십시오. |
메시지 키 필드 |
입력 레코드에 있는 필드 중 Kafka 메시지의 키로 사용해야 하는 필드의 이름입니다. |
Publish Strategy |
수신 FlowFile 레코드를 Kafka에 게시하는 데 사용되는 형식입니다. |
Record Key Writer |
발신 FlowFiles 에 사용할 Record Key Writer |
Record Metadata Strategy |
레코드의 메타데이터(항목 및 파티션)를 레코드의 메타데이터 필드에서 가져올지 아니면 구성된 항목 이름 및 파티션/파티셔너 클래스 속성에서 가져올지 여부를 지정합니다 |
Record Reader |
수신 FlowFiles 에 사용할 Record Reader |
Record Writer |
Kafka로 보내기 전에 데이터를 직렬화하기 위해 사용할 Record Writer입니다 |
항목 이름 |
프로세서가 Kafka 레코드를 게시하는 Kafka 항목의 이름입니다 |
트랜잭션 ID 접두사 |
KafkaProducer config transactional.id가 UUID 로 생성되고 구성된 문자열이 접두사로 붙도록 지정합니다. |
트랜잭션 활성화됨 |
Kafka와 통신할 때 트랜잭션 보증을 제공할지 여부를 지정합니다. Kafka로 데이터를 전송하는 데 문제가 있고 이 속성이 false로 설정되어 있으면 이미 Kafka로 전송된 메시지가 계속 진행되어 컨슈머에게 전달됩니다. 이 값을 true로 설정하면 Kafka 트랜잭션이 롤백되어 해당 메시지를 컨슈머가 사용할 수 없게 됩니다. 이를 true로 설정하려면 [Delivery Guarantee] 속성을 [Guarantee Replicated Delivery]로 설정해야 합니다 |
acks |
메시지가 Kafka로 전송되도록 보장하기 위한 요구 사항을 지정합니다. Kafka 클라이언트 acks 속성에 해당합니다. |
compression.type |
Kafka로 전송되는 레코드에 대한 압축 전략을 지정합니다. Kafka 클라이언트 compression.type 속성에 해당합니다. |
max.request.size |
요청의 최대 크기(바이트)입니다. Kafka 클라이언트 max.request.size 속성에 해당합니다. |
파티션 |
레코드에 대한 Kafka 파티션 대상을 지정합니다. |
partitioner.class |
메시지의 파티션 ID를 계산하는 데 사용할 클래스를 지정합니다. Kafka 클라이언트 partitioner.class 속성에 해당합니다. |
관계¶
이름 |
설명 |
---|---|
실패 |
Kafka로 전송할 수 없는 FlowFile 은 이 관계로 라우팅됩니다 |
성공 |
모든 내용이 Kafka로 전송된 FlowFiles 입니다. |
Writes 특성¶
이름 |
설명 |
---|---|
msg.count |
이 FlowFile 을 위해 Kafka로 전송된 메시지 수입니다. 이 특성은 성공으로 라우팅되는 FlowFiles 에만 추가됩니다. |