Apache Kafka com DLQ e metadados¶
Nota
O conector está sujeito aos termos do conector.
Este tópico descreve o Apache Kafka com DLQ e o conector de metadados. Esse é o conector completo que oferece paridade de recursos com o conector Snowflake legado para Kafka e inclui recursos avançados para casos de uso de produção.
Principais recursos¶
O Apache Kafka com DLQ e conector de metadados oferece funcionalidade abrangente:
Suporte à fila de mensagens não entregues (DLQ) para tratamento de mensagens com falha
Coluna RECORD_METADATA com metadados de mensagens do Kafka
Esquematização configurável – ativa ou desativa a detecção de esquemas
Suporte à tabela Iceberg com a evolução do esquema
Suporte a vários formatos de mensagem – JSON e AVRO
Integração do registro de esquema para mensagens AVRO
Mapeamento de tópico para tabela com padrões avançados
Suporte à autenticação SASL
Parâmetros específicos¶
Além dos parâmetros comuns descritos em Configure o Openflow Connector para Kafka, esse conector inclui contextos de parâmetro adicionais para recursos avançados.
Formato de mensagem e parâmetros de esquema¶
Parâmetro |
Descrição |
Obrigatório |
---|---|---|
Formato de mensagem |
O formato das mensagens no Kafka. Uma das seguintes opções: JSON/AVRO. Padrão: JSON |
Sim |
Esquema AVRO |
O esquema Avro, caso schema-text-property seja usado na estratégia de acesso ao esquema AVRO com o formato de mensagem AVRO. Observação: isso só deve ser usado se todas as mensagens consumidas do(s) tópico(s) Kafka configurado(s) tiverem o mesmo esquema. |
Não |
Estratégia de acesso ao esquema AVRO |
O método de acessar o esquema AVRO de uma mensagem. Necessário para AVRO. Uma das seguintes opções: embedded-avro-schema/schema-reference-reader/schema-text-property. Padrão: embedded-avro-schema |
Não |
Parâmetros de registro de esquema¶
Parâmetro |
Descrição |
Obrigatório |
---|---|---|
Tipo de autenticação do registro de esquema |
O método de autenticação no registro de esquema, se usado. Caso contrário, use NONE. Uma das seguintes opções: NONE/BASIC. Padrão: NONE |
Sim |
URL do registro de esquema |
O URL do registro de esquema. Necessário para o formato de mensagem AVRO. |
Não |
Nome de usuário do registro de esquema |
O nome de usuário do registro de esquema. Necessário para o formato de mensagem AVRO. |
Não |
Senha do registro de esquema |
A senha do registro de esquema. Necessário para o formato de mensagem AVRO. |
Não |
DLQ e parâmetros de recursos avançados¶
Parâmetro |
Descrição |
Obrigatório |
---|---|---|
Tópico do DLQ do Kafka |
DLQ para enviar mensagens com erros de análise para |
Sim |
Esquematização ativada |
Determina se os dados serão inseridos em colunas individuais ou em um único campo RECORD_CONTENT. Uma das seguintes opções: verdadeiro/falso. Padrão: verdadeiro |
Sim |
Iceberg ativado |
Especifica se o processador ingere dados em uma tabela Iceberg. O processador falhará se essa propriedade não corresponder ao tipo de tabela real. Padrão: falso |
Sim |
Comportamento de esquematização¶
O comportamento do conector muda com base no parâmetro Schematization Enabled:
Esquematização ativada¶
Quando a esquematização está ativada, o conector:
Cria colunas individuais para cada campo da mensagem
Inclui uma coluna RECORD_METADATA com metadados do Kafka
Evolui automaticamente o esquema da tabela quando novos campos são detectados
Nivela as estruturas JSON/AVRO aninhadas em colunas separadas
Exemplo de estrutura de tabela:
Linha |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|---|
1 |
{«timestamp»:1669074170090, «headers»: {«current.iter… |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{«timestamp»:1669074170400, «headers»: {«current.iter… |
XYZ789 |
ZABX |
SELL |
3024 |
Esquematização desativada¶
Quando a esquematização está desativada, o conector:
Cria apenas duas colunas: RECORD_CONTENT e RECORD_METADATA
Armazena o conteúdo completo da mensagem como um OBJECT em RECORD_CONTENT
Não realiza a evolução automática do esquema
Oferece flexibilidade máxima para o processamento downstream
Exemplo de estrutura de tabela:
Linha |
RECORD_METADATA |
RECORD_CONTENT |
---|---|---|
1 |
{«timestamp»:1669074170090, «headers»: {«current.iter… |
{«account»: «ABC123», «symbol»: «ZTEST», «side»:… |
2 |
{«timestamp»:1669074170400, «headers»: {«current.iter… |
{«account»: «XYZ789», «symbol»: «ZABX», «side»:… |
Use a propriedade Schematization Enabled
nas propriedades de configuração do conector para ativar ou desativar a detecção de esquema.
Detecção e evolução de esquema¶
O conector oferece suporte à detecção e à evolução de esquema. A estrutura de tabelas no Snowflake pode ser definida e desenvolvida automaticamente para oferecer suporte à estrutura de novos dados carregados pelo conector.
Sem a detecção e a evolução do esquema, a tabela Snowflake carregada pelo conector consiste apenas em duas colunas OBJECT
: RECORD_CONTENT
e RECORD_METADATA
.
Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL
de colunas ausentes em novos arquivos de dados.
A detecção de esquema com o conector é compatível com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.
JSON ARRAY não é compatível com esquematização adicional.
Possibilitando a evolução do esquema¶
Se o conector criar a tabela de destino, a evolução do esquema será ativada por padrão.
Se você quiser ativar ou desativar a evolução do esquema na tabela existente, use o comando ALTER TABLE para definir o parâmetro ENABLE_SCHEMA_EVOLUTION
. Você também deve usar uma função com o privilégio OWNERSHIP
na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.
No entanto, se a evolução do esquema estiver desativada para uma tabela existente, o conector tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens não entregues configuradas (DLQ).
Estrutura de RECORD_METADATA¶
A coluna RECORD_METADATA contém metadados importantes da mensagem Kafka:
Campo |
Descrição |
---|---|
offset |
O deslocamento da mensagem na partição do Kafka |
topic |
O nome do tópico do Kafka |
partition |
O número da partição do Kafka |
key |
A chave de mensagem (se houver) |
timestamp |
O carimbo de data/hora da mensagem |
SnowflakeConnectorPushTime |
Carimbo de data/hora em que o conector buscou a mensagem no Kafka |
headers |
Mapa de cabeçalhos de mensagem (se houver) |
Fila de mensagens não entregues (DLQ)¶
A funcionalidade DLQ trata das mensagens que não podem ser processadas com êxito:
Comportamento de DLQ¶
Falhas de análise – Mensagens com formato inválido JSON/AVRO são enviadas para a DLQ
Incompatibilidades de esquema – Mensagens que não correspondem ao esquema esperado quando a evolução do esquema está desativada
Erros de processamento – Outras falhas de processamento durante a ingestão
Suporte de tabela Iceberg¶
O Openflow Connector para Kafka pode ingerir dados em uma tabela gerenciada pelo Snowflake Apache Iceberg™ quando Iceberg ativado estiver definido como verdadeiro.
Requisitos e limitações¶
Antes de configurar o conector Kafka do Openflow para a ingestão de tabelas Iceberg, observe os seguintes requisitos e limitações:
É necessário criar uma tabela Iceberg antes de executar o conector.
Certifique-se de que o usuário tenha acesso à inserção de dados nas tabelas criadas.
Configuração e instalação¶
Para configurar o Openflow Connector para Kafka para a ingestão de tabelas Iceberg, siga as etapas em Configure o Openflow Connector para Kafka com algumas diferenças observadas nas seções a seguir.
Habilite a ingestão na tabela Iceberg¶
Para permitir a ingestão em uma tabela Iceberg, você deve definir o parâmetro Iceberg Enabled
como true
.
Criação de uma tabela Iceberg para ingestão¶
Antes de executar o conector, é necessário criar uma tabela Iceberg. O esquema inicial da tabela depende das configurações da propriedade Schematization Enabled
do conector.
Se você ativar a esquematização, deverá criar uma tabela com uma coluna chamada record_metadata
:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
O conector cria automaticamente as colunas para os campos de mensagem e altera o esquema de colunas do record_metadata
.
Se você não ativar a esquematização, deverá criar uma tabela com uma coluna chamada record_content
de um tipo que corresponda ao conteúdo real da mensagem do Kafka. O conector cria automaticamente a coluna record_metadata
.
Ao criar uma tabela Iceberg, é possível usar tipos de dados Iceberg ou tipos Snowflake compatíveis. O tipo VARIANT semiestruturado não é compatível. Em vez disso, use um OBJECT ou MAP estruturado.
Por exemplo, considere a seguinte mensagem:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Exemplos de criação de tabelas Iceberg¶
Com a esquematização ativada:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Com a esquematização desativada:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Nota
RECORD_METADATA deve sempre ser criado. Os nomes de campo dentro de estruturas aninhadas, como dogs
ou cats
, diferenciam maiúsculas de minúsculas.
Casos de uso¶
Esse conector é ideal para:
Ambientes de produção que exigem DLQ
Linhagem de dados e auditoria em que os metadados do Kafka são importantes
Processamento de mensagem complexa com requisitos de evolução do esquema
Integração da tabela Iceberg
Se você precisar de uma ingestão mais simples, sem metadados ou recursos de DLQ, considere os conectores Apache Kafka para o formato de dados JSON/AVRO.