PublishKafka 2025.5.31.15

번들

com.snowflake.openflow.runtime | runtime-kafka-nar

설명

FlowFile 의 내용을 메시지 또는 개별 레코드로서 Kafka Producer API 를 사용하여 Apache Kafka로 전송합니다. 전송할 메시지는 개별 FlowFiles 일 수도 있고, 사용자가 지정한 구분 기호(예: 새 줄)를 사용하여 구분할 수도 있으며, 구성된 Record Reader에서 읽을 수 있는 레코드 지향 데이터일 수도 있습니다. 메시지를 가져오기 위한 보완적인 NiFi 프로세서는 ConsumeKafka 입니다.

태그

apache, avro, csv, json, kafka, logs, message, openflow, pubsub, put, record, send

입력 요구 사항

REQUIRED

민감한 동적 속성 지원

false

속성

속성

설명

Failure Strategy

프로세서가 데이터를 Kafka에 게시할 수 없는 경우 FlowFile 을 처리하는 방법을 지정합니다

FlowFile 특성 헤더 패턴

모든 FlowFile 특성 이름에 대해 일치하는 정규식입니다. 이름이 패턴과 일치하는 모든 특성은 헤더로 Kafka 메시지에 추가됩니다. 지정하지 않으면 FlowFile 특성이 헤더로 추가되지 않습니다.

Header Encoding

Kafka Record Header로 추가되는 특성의 경우 이 속성은 헤더를 직렬화할 때 사용할 문자 인코딩을 나타냅니다.

Kafka Connection Service

Kafka 레코드를 게시하기 위한 Kafka Broker에 대한 연결을 제공합니다

Kafka 키

메시지에 사용할 키입니다. 지정하지 않으면 FlowFile 특성이 있는 경우 ‘kafka.key’가 메시지 키로 사용됩니다. Kafka 키를 설정하고 동시에 경계를 지정하면 잠재적으로 동일한 키를 가진 많은 Kafka 메시지가 발생할 수 있습니다. 일반적으로 Kafka는 메시지 및 키 고유성을 강제하거나 가정하지 않으므로 이는 문제가 되지 않습니다. 하지만 구분 기호과 Kafka 키를 동시에 설정하면 Kafka에서 데이터가 손실될 위험이 있습니다. Kafka에서 항목 압축을 하는 동안 이 키를 기준으로 메시지가 중복 제거됩니다.

Kafka 키 특성 인코딩

FlowFiles 은 ‘kafka.key’라는 특성을 가집니다. 이 속성은 특성 값을 인코딩하는 방법을 지정합니다.

Message Demarcator

단일 FlowFile 내에서 여러 메시지를 구분하는 데 사용할 문자열(UTF-8로 해석됨)을 지정합니다. 지정하지 않으면 FlowFile 의 전체 내용이 1개의 메시지로 사용됩니다. 지정하면 FlowFile 의 내용이 이 구분 기호로 분할되어 각 섹션이 별도의 Kafka 메시지로 전송됩니다. ‘new line’과 같은 특수 문자를 입력하려면 OS 에 따라 CTRL+Enter 또는 Shift+Enter를 사용하십시오.

메시지 키 필드

입력 레코드에 있는 필드 중 Kafka 메시지의 키로 사용해야 하는 필드의 이름입니다.

Publish Strategy

수신 FlowFile 레코드를 Kafka에 게시하는 데 사용되는 형식입니다.

Record Key Writer

발신 FlowFiles 에 사용할 Record Key Writer

Record Metadata Strategy

레코드의 메타데이터(항목 및 파티션)를 레코드의 메타데이터 필드에서 가져올지 아니면 구성된 항목 이름 및 파티션/파티셔너 클래스 속성에서 가져올지 여부를 지정합니다

Record Reader

수신 FlowFiles 에 사용할 Record Reader

Record Writer

Kafka로 보내기 전에 데이터를 직렬화하기 위해 사용할 Record Writer입니다

항목 이름

프로세서가 Kafka 레코드를 게시하는 Kafka 항목의 이름입니다

트랜잭션 ID 접두사

KafkaProducer config transactional.id가 UUID 로 생성되고 구성된 문자열이 접두사로 붙도록 지정합니다.

트랜잭션 활성화됨

Kafka와 통신할 때 트랜잭션 보증을 제공할지 여부를 지정합니다. Kafka로 데이터를 전송하는 데 문제가 있고 이 속성이 false로 설정되어 있으면 이미 Kafka로 전송된 메시지가 계속 진행되어 컨슈머에게 전달됩니다. 이 값을 true로 설정하면 Kafka 트랜잭션이 롤백되어 해당 메시지를 컨슈머가 사용할 수 없게 됩니다. 이를 true로 설정하려면 [Delivery Guarantee] 속성을 [Guarantee Replicated Delivery]로 설정해야 합니다

acks

메시지가 Kafka로 전송되도록 보장하기 위한 요구 사항을 지정합니다. Kafka 클라이언트 acks 속성에 해당합니다.

compression.type

Kafka로 전송되는 레코드에 대한 압축 전략을 지정합니다. Kafka 클라이언트 compression.type 속성에 해당합니다.

max.request.size

요청의 최대 크기(바이트)입니다. Kafka 클라이언트 max.request.size 속성에 해당합니다.

파티션

레코드에 대한 Kafka 파티션 대상을 지정합니다.

partitioner.class

메시지의 파티션 ID를 계산하는 데 사용할 클래스를 지정합니다. Kafka 클라이언트 partitioner.class 속성에 해당합니다.

관계

이름

설명

실패

Kafka로 전송할 수 없는 FlowFile 은 이 관계로 라우팅됩니다

성공

모든 내용이 Kafka로 전송된 FlowFiles 입니다.

Writes 특성

이름

설명

msg.count

이 FlowFile 을 위해 Kafka로 전송된 메시지 수입니다. 이 특성은 성공으로 라우팅되는 FlowFiles 에만 추가됩니다.

참고 항목