Apache Kafka com DLQ e metadados

Nota

O conector está sujeito aos termos do conector.

Este tópico descreve o Apache Kafka com DLQ e o conector de metadados. Esse é o conector completo que oferece paridade de recursos com o conector Snowflake legado para Kafka e inclui recursos avançados para casos de uso de produção.

Principais recursos

O Apache Kafka com DLQ e conector de metadados oferece funcionalidade abrangente:

  • Suporte à fila de mensagens não entregues (DLQ) para tratamento de mensagens com falha

  • Coluna RECORD_METADATA com metadados de mensagens do Kafka

  • Esquematização configurável – ativa ou desativa a detecção de esquemas

  • Suporte à tabela Iceberg com a evolução do esquema

  • Suporte a vários formatos de mensagem – JSON e AVRO

  • Integração do registro de esquema para mensagens AVRO

  • Mapeamento de tópico para tabela com padrões avançados

  • Suporte à autenticação SASL

Parâmetros específicos

Além dos parâmetros comuns descritos em Configure o Openflow Connector para Kafka, esse conector inclui contextos de parâmetro adicionais para recursos avançados.

Formato de mensagem e parâmetros de esquema

Parâmetro

Descrição

Obrigatório

Formato de mensagem

O formato das mensagens no Kafka. Uma das seguintes opções: JSON/AVRO. Padrão: JSON

Sim

Esquema AVRO

O esquema Avro, caso schema-text-property seja usado na estratégia de acesso ao esquema AVRO com o formato de mensagem AVRO. Observação: isso só deve ser usado se todas as mensagens consumidas do(s) tópico(s) Kafka configurado(s) tiverem o mesmo esquema.

Não

Estratégia de acesso ao esquema AVRO

O método de acessar o esquema AVRO de uma mensagem. Necessário para AVRO. Uma das seguintes opções: embedded-avro-schema/schema-reference-reader/schema-text-property. Padrão: embedded-avro-schema

Não

Parâmetros de registro de esquema

Parâmetro

Descrição

Obrigatório

Tipo de autenticação do registro de esquema

O método de autenticação no registro de esquema, se usado. Caso contrário, use NONE. Uma das seguintes opções: NONE/BASIC. Padrão: NONE

Sim

URL do registro de esquema

O URL do registro de esquema. Necessário para o formato de mensagem AVRO.

Não

Nome de usuário do registro de esquema

O nome de usuário do registro de esquema. Necessário para o formato de mensagem AVRO.

Não

Senha do registro de esquema

A senha do registro de esquema. Necessário para o formato de mensagem AVRO.

Não

DLQ e parâmetros de recursos avançados

Parâmetro

Descrição

Obrigatório

Tópico do DLQ do Kafka

DLQ para enviar mensagens com erros de análise para

Sim

Esquematização ativada

Determina se os dados serão inseridos em colunas individuais ou em um único campo RECORD_CONTENT. Uma das seguintes opções: verdadeiro/falso. Padrão: verdadeiro

Sim

Iceberg ativado

Especifica se o processador ingere dados em uma tabela Iceberg. O processador falhará se essa propriedade não corresponder ao tipo de tabela real. Padrão: falso

Sim

Comportamento de esquematização

O comportamento do conector muda com base no parâmetro Schematization Enabled:

Esquematização ativada

Quando a esquematização está ativada, o conector:

  • Cria colunas individuais para cada campo da mensagem

  • Inclui uma coluna RECORD_METADATA com metadados do Kafka

  • Evolui automaticamente o esquema da tabela quando novos campos são detectados

  • Nivela as estruturas JSON/AVRO aninhadas em colunas separadas

Exemplo de estrutura de tabela:

Linha

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{«timestamp»:1669074170090, «headers»: {«current.iter…

ABC123

ZTEST

BUY

3572

2

{«timestamp»:1669074170400, «headers»: {«current.iter…

XYZ789

ZABX

SELL

3024

Esquematização desativada

Quando a esquematização está desativada, o conector:

  • Cria apenas duas colunas: RECORD_CONTENT e RECORD_METADATA

  • Armazena o conteúdo completo da mensagem como um OBJECT em RECORD_CONTENT

  • Não realiza a evolução automática do esquema

  • Oferece flexibilidade máxima para o processamento downstream

Exemplo de estrutura de tabela:

Linha

RECORD_METADATA

RECORD_CONTENT

1

{«timestamp»:1669074170090, «headers»: {«current.iter…

{«account»: «ABC123», «symbol»: «ZTEST», «side»:…

2

{«timestamp»:1669074170400, «headers»: {«current.iter…

{«account»: «XYZ789», «symbol»: «ZABX», «side»:…

Use a propriedade Schematization Enabled nas propriedades de configuração do conector para ativar ou desativar a detecção de esquema.

Detecção e evolução de esquema

O conector oferece suporte à detecção e à evolução de esquema. A estrutura de tabelas no Snowflake pode ser definida e desenvolvida automaticamente para oferecer suporte à estrutura de novos dados carregados pelo conector.

Sem a detecção e a evolução do esquema, a tabela Snowflake carregada pelo conector consiste apenas em duas colunas OBJECT: RECORD_CONTENT e RECORD_METADATA.

Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos arquivos de dados.

A detecção de esquema com o conector é compatível com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.

JSON ARRAY não é compatível com esquematização adicional.

Possibilitando a evolução do esquema

Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão.

Se você quiser ativar ou desativar a evolução do esquema na tabela existente, use o comando ALTER TABLE para definir o parâmetro ENABLE_SCHEMA_EVOLUTION. Você também deve usar uma função com o privilégio OWNERSHIP na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.

No entanto, se a evolução do esquema estiver desativada para uma tabela existente, o conector tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens não entregues configuradas (DLQ).

Estrutura de RECORD_METADATA

A coluna RECORD_METADATA contém metadados importantes da mensagem Kafka:

Campo

Descrição

offset

O deslocamento da mensagem na partição do Kafka

topic

O nome do tópico do Kafka

partition

O número da partição do Kafka

key

A chave de mensagem (se houver)

timestamp

O carimbo de data/hora da mensagem

SnowflakeConnectorPushTime

Carimbo de data/hora em que o conector buscou a mensagem no Kafka

headers

Mapa de cabeçalhos de mensagem (se houver)

Fila de mensagens não entregues (DLQ)

A funcionalidade DLQ trata das mensagens que não podem ser processadas com êxito:

Comportamento de DLQ

  • Falhas de análise – Mensagens com formato inválido JSON/AVRO são enviadas para a DLQ

  • Incompatibilidades de esquema – Mensagens que não correspondem ao esquema esperado quando a evolução do esquema está desativada

  • Erros de processamento – Outras falhas de processamento durante a ingestão

Suporte de tabela Iceberg

O Openflow Connector para Kafka pode ingerir dados em uma tabela gerenciada pelo Snowflake Apache Iceberg™ quando Iceberg ativado estiver definido como verdadeiro.

Requisitos e limitações

Antes de configurar o conector Kafka do Openflow para a ingestão de tabelas Iceberg, observe os seguintes requisitos e limitações:

  • É necessário criar uma tabela Iceberg antes de executar o conector.

  • Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.

Configuração e instalação

Para configurar o Openflow Connector para Kafka para a ingestão de tabelas Iceberg, siga as etapas em Configure o Openflow Connector para Kafka com algumas diferenças observadas nas seções a seguir.

Habilite a ingestão na tabela Iceberg

Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled como true.

Criação de uma tabela Iceberg para ingestão

Antes de executar o conector, é necessário criar uma tabela Iceberg. O esquema inicial da tabela depende das configurações da propriedade Schematization Enabled do conector.

Se você ativar a esquematização, deverá criar uma tabela com uma coluna chamada record_metadata:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

O conector cria automaticamente as colunas para os campos de mensagem e altera o esquema de colunas do record_metadata.

Se você não ativar a esquematização, deverá criar uma tabela com uma coluna chamada record_content de um tipo que corresponda ao conteúdo real da mensagem do Kafka. O conector cria automaticamente a coluna record_metadata.

Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg ou tipos Snowflake compatíveis. O tipo VARIANT semiestruturado não é compatível. Em vez disso, use um OBJECT ou MAP estruturado.

Por exemplo, considere a seguinte mensagem:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Exemplos de criação de tabelas Iceberg

Com a esquematização ativada:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Com a esquematização desativada:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Nota

RECORD_METADATA deve sempre ser criado. Os nomes de campo dentro de estruturas aninhadas, como dogs ou cats, diferenciam maiúsculas de minúsculas.

Casos de uso

Esse conector é ideal para:

  • Ambientes de produção que exigem DLQ

  • Linhagem de dados e auditoria em que os metadados do Kafka são importantes

  • Processamento de mensagem complexa com requisitos de evolução do esquema

  • Integração da tabela Iceberg

Se você precisar de uma ingestão mais simples, sem metadados ou recursos de DLQ, considere os conectores Apache Kafka para o formato de dados JSON/AVRO.