ConsumeKafka 2025.5.31.15¶
Pacote¶
com.snowflake.openflow.runtime | runtime-kafka-nar
Descrição¶
Consome mensagens da Apache Kafka Consumer API. O processador NiFi complementar para enviar mensagens é PublishKafka. O processador oferece suporte ao consumo de mensagens Kafka, opcionalmente interpretadas como registros NiFi. Observe que, nesse momento (no modo de registro de leitura), o processador presume que todos os registros recuperados de uma determinada partição têm o mesmo esquema. Nesse modo, se alguma das mensagens do Kafka for extraída, mas não puder ser analisada ou gravada com o Record Reader ou Record Writer configurado, o conteúdo da mensagem será gravado em um FlowFile separado, e esse FlowFile será transferido para a relação “parse.failure”. Caso contrário, cada FlowFile é enviado à relação “success” e pode conter muitas mensagens individuais dentro de um único FlowFile. Um atributo “record.count” é adicionado para indicar quantas mensagens estão contidas no FlowFile. Duas mensagens do Kafka não serão colocadas no mesmo FlowFile se tiverem esquemas diferentes ou se tiverem valores diferentes para um cabeçalho de mensagem incluído pela propriedade <Headers to Add as Attributes>.
Requisito de entrada¶
FORBIDDEN
Oferece suporte a propriedades dinâmicas confidenciais¶
falso
Propriedades¶
Propriedade |
Descrição |
---|---|
Confirmação de deslocamentos |
Especifica se esse processador deve confirmar os deslocamentos no Kafka após receber mensagens. Normalmente, esse valor deve ser definido como verdadeiro para que as mensagens recebidas não sejam duplicadas. No entanto, em determinados cenários, talvez queiramos evitar o comprometimento dos deslocamentos, para que os dados possam ser processados e posteriormente reconhecidos pelo PublishKafka, a fim de fornecer a semântica “exatamente uma vez”. |
Campo de conteúdo |
Especifica em qual campo do registro o conteúdo será adicionado. Se não for definido, o conteúdo estará na raiz do registro |
ID do grupo |
Identificador do grupo de consumidores Kafka correspondente à propriedade group.id do Kafka |
Codificação de cabeçalho |
Codificação de caracteres aplicada na leitura de valores do cabeçalho de registro do Kafka e na gravação dos atributos do FlowFile |
Padrão de nome de cabeçalho |
Padrão de expressão regular aplicado aos nomes de cabeçalho de registro do Kafka para selecionar valores de cabeçalho a serem gravados como atributos FlowFile |
Campo pai dos cabeçalhos |
Especifica em qual campo do registro o campo de cabeçalhos será adicionado. Se não for definido, o campo de cabeçalhos estará na raiz do registro |
Serviço de conexão Kafka |
Fornece conexões com o corretor Kafka para a publicação de registros Kafka |
Codificação de atributos da chave |
Codificação do valor do atributo FlowFile configurado com a chave de registro do Kafka. |
Campo da chave pai |
Especifica em qual campo do registro o campo da chave será adicionado. Se não for definido, o campo da chave estará na raiz do registro |
Formato da chave |
Especifica como representar a chave de registro do Kafka na saída FlowFile |
Record Reader de chaves |
O Record Reader a ser usado para analisar a chave de registro do Kafka em um registro |
Tempo máximo não comprometido |
Especifica a quantidade máxima de tempo que o processador pode consumir do Kafka antes de transferir FlowFiles através do fluxo e confirmar os deslocamentos no Kafka (se apropriado). Um período de tempo maior pode resultar em uma latência mais longa |
Demarcador de mensagem |
Como o KafkaConsumer recebe mensagens em lotes, esse processador tem a opção de gerar FlowFiles, que contém todas as mensagens do Kafka em um único lote para um determinado tópico e partição, e essa propriedade permite que você forneça uma cadeia de caracteres (interpretada como UTF-8) a ser usada para demarcar várias mensagens do Kafka. Essa é uma propriedade opcional e, se não for fornecida, cada mensagem do Kafka recebida resultará em um único FlowFile quando for acionada. Para inserir um caractere especial, como “nova linha”, use CTRL + Enter ou Shift + Enter, dependendo do OS |
Campo de metadados |
Especifica em qual campo do registro os metadados serão adicionados. Se não for definido, os metadados ficarão na raiz do registro |
Campo de carimbo de data/hora de metadados recebidos |
Se especificado, um carimbo de data/hora será colocado sob o campo especificado nos metadados do registro no FlowFile de saída |
Estratégia de saída |
O formato usado para gerar o registro Kafka em um registro FlowFile. |
Estratégia de processamento |
Estratégia para processar registros do Kafka e gravar a saída serializada no FlowFiles |
Record Reader |
O Record Reader a ser usado para mensagens Kafka recebidas |
Record Writer |
O Record Writer a ser usado para serializar os FlowFiles de saída |
Separar por chave |
Quando essa propriedade estiver ativada, duas mensagens só serão adicionadas ao mesmo FlowFile se ambas as mensagens Kafka tiverem chaves idênticas. |
Formato do tópico |
Especifica se os tópicos fornecidos são uma lista de nomes separados por vírgula ou uma única expressão regular |
Tópicos |
O nome ou padrão dos tópicos Kafka dos quais o processador consome registros Kafka. Mais de um pode ser fornecido se for separado por vírgulas. |
auto.offset.reset |
Configuração automática de deslocamento aplicada quando nenhum deslocamento anterior do consumidor é encontrado, correspondendo à propriedade auto.offset.reset do Kafka |
Relações¶
Nome |
Descrição |
---|---|
success |
FlowFiles contendo um ou mais registros Kafka serializados |
Grava atributos¶
Nome |
Descrição |
---|---|
record.count |
O número de registros recebidos |
mime.type |
O tipo MIME fornecido pelo Record Writer configurado |
kafka.count |
O número de mensagens gravadas se houver mais de uma |
kafka.key |
A chave da mensagem se estiver presente e se for uma única mensagem. A forma como a chave é codificada depende do valor da propriedade de codificação de atributo da chave. |
kafka.offset |
O deslocamento da mensagem na partição do tópico. |
kafka.timestamp |
O carimbo de data/hora da mensagem na partição do tópico. |
kafka.partition |
A partição do tópico da qual a mensagem ou o pacote de mensagens é proveniente |
kafka.topic |
O tópico de onde vem a mensagem ou o pacote de mensagens |
kafka.tombstone |
Defina como verdadeiro se a mensagem consumida for uma mensagem tombstone |