Visão geral do conector Kafka¶
Este tópico fornece uma visão geral do Apache Kafka e do conector Snowflake para Kafka.
Nota
O conector Kafka está sujeito aos Termos do conector.
Neste tópico:
Introdução ao Apache Kafka¶
O software Apache Kafka usa um modelo de publicação e assinatura para escrever e gravar fluxos de registros, semelhante a uma fila de mensagens ou sistema de mensagens empresariais. O Kafka permite que os processos leiam e gravem mensagens de forma assíncrona. Um assinante não precisa estar conectado diretamente a um editor. Uma editora pode colocar uma mensagem na fila no Kafka para que o assinante a receba mais tarde.
Um aplicativo publica mensagens para um tópico, e um aplicativo assina um tópico para receber essas mensagens. O Kafka pode processar, assim como transmitir, mensagens. Entretanto, isso está fora do escopo deste documento. Os tópicos podem ser divididos em partições para aumentar a escalabilidade.
O Kafka Connect é uma estrutura para conectar o Kafka a sistemas externos, incluindo bancos de dados. Um cluster Kafka Connect é um cluster separado do cluster Kafka. O cluster Kafka Connect oferece suporte para executar e escalonar conectores (componentes que permitem a leitura e/ou gravação entre sistemas externos).
O conector Kafka foi desenvolvido para funcionar em um cluster Kafka Connect, ler dados de tópicos Kafka e gravar os dados em tabelas Snowflake.
O Snowflake fornece duas versões do conector:
Uma versão para a versão do pacote Confluent do Kafka.
Para obter mais informações sobre o Kafka Connect, consulte https://docs.confluent.io/current/connect/.
Nota
Uma versão hospedada do conector Kafka está disponível na Confluent Cloud. Para obter mais informações, consulte https://docs.confluent.io/current/cloud/connectors/cc-snowflake-sink.html.
Uma versão do pacote de software de código aberto (OSS) Apache Kafka.
Para obter mais informações sobre o Apache Kafka, consulte https://kafka.apache.org/.
Da perspectiva do Snowflake, um tópico Kafka produz um fluxo de linhas a serem inseridas em uma tabela do Snowflake. Em geral, cada mensagem do Kafka contém uma linha.
O Kafka, como muitas plataformas de publicação/assinatura de mensagens, permite uma relação muitos-para-muitos entre editores e assinantes. Um único aplicativo pode publicar vários tópicos, e um único aplicativo pode assinar vários tópicos. Com o Snowflake, o padrão típico é que um tópico fornece mensagens (linhas) para uma tabela do Snowflake.
A versão atual do conector Kafka está limitada ao carregamento de dados no Snowflake. O conector Kafka oferece suporte a dois métodos de carregamento de dados:
Para obter mais informações, consulte Carregamento de dados para o Snowflake e Uso do conector Snowflake para Kafka com Snowpipe Streaming.
Tabelas de destino para tópicos do Kafka¶
Os tópicos do Kafka podem ser mapeados para tabelas existentes do Snowflake na configuração do Kafka. Se os tópicos não forem mapeados, então o conector Kafka cria uma nova tabela para cada tópico usando o nome do tópico.
O conector converte o nome do tópico para um nome válido de tabela do Snowflake usando as seguintes regras:
Os nomes de tópicos em minúsculas são convertidos em nomes de tabelas em maiúsculas.
Se o primeiro caractere no nome do tópico não for uma letra (
a-z
, ouA-Z
) ou um caractere de sublinhado (_
), então o conector preenche o nome da tabela com um sublinhado.Se qualquer caractere dentro do nome do tópico não for um caractere válido para um nome de tabela do Snowflake, então esse caractere é substituído pelo caractere de sublinhado. Para obter mais informações sobre quais caracteres são válidos nos nomes das tabelas, consulte Requisitos para identificadores.
Observe que se o conector Kafka precisar ajustar o nome da tabela criada para um tópico Kakfa, é possível que os nomes de duas tabelas no mesmo esquema fiquem idênticos. Por exemplo, se você estiver lendo dados dos tópicos numbers+x
e numbers-x
, as tabelas criadas para estes tópicos seriam ambas NUMBERS_X
. Para evitar duplicação acidental de nomes de tabelas, o conector acrescenta um sufixo ao nome da tabela. O sufixo é um sublinhado seguido por um código hash gerado.
Dica
O Snowflake recomenda que, quando possível, você escolha nomes de tópicos que sigam as regras dos nomes identificadores do Snowflake.
Esquema de tabelas para tópicos do Kafka¶
Com o Snowpipe Streaming, o conector Kafka oferece suporte opcional à detecção e evolução de esquema.
Por padrão, com o Snowpipe ou Snowpipe Streaming, cada tabela do Snowflake carregada pelo conector Kafka tem um esquema formado por duas colunas VARIANT:
RECORD_CONTENT. Essa contém a mensagem do Kafka.
RECORD_METADATA. Essa contém os metadados da mensagem, por exemplo, o tópico a partir do qual a mensagem foi lida.
Se o Snowflake criar a tabela, então a tabela vai conter apenas essas duas colunas. Se o usuário criar a tabela para que o conector Kafka adicione as linhas, então a tabela pode conter mais do que essas duas colunas (qualquer coluna adicional deve permitir valores NULL porque os dados do conector não incluem valores para essas colunas).
A coluna RECORD_CONTENT contém a mensagem do Kafka.
Uma mensagem do Kafka tem uma estrutura interna que depende da informação que está sendo enviada. Por exemplo, uma mensagem de um sensor meteorológico IoT (Internet das Coisas) pode incluir o carimbo de data/hora em que os dados foram registrados, a localização do sensor, a temperatura, a umidade, etc. Uma mensagem de um sistema de inventário pode incluir a ID do produto e o número de itens vendidos, talvez juntamente com um carimbo de data/hora indicando quando foi a venda ou o envio.
Tipicamente, cada mensagem de um tópico específico tem a mesma estrutura básica. Tópicos diferentes costumam ter estruturas diferentes.
Cada mensagem do Kafka é transmitida ao Snowflake no formato JSON ou Avro. O conector Kafka armazena essa informação formatada em uma única coluna do tipo VARIANT. Os dados não são analisados e não são separados em várias colunas na tabela do Snowflake.
A coluna RECORD_METADATA contém, por padrão, as seguintes informações:
Campo |
Tipo de dados . Java |
Tipo de dados . no SQL |
Obrigatório |
Descrição |
---|---|---|---|---|
tópico |
Cadeia de caracteres |
VARCHAR |
Sim |
O nome do tópico Kafka que deu origem ao registro. |
partição |
Cadeia de caracteres |
VARCHAR |
Sim |
O número da partição dentro do tópico. (Note que esta é a partição do Kafka, não a micropartição do Snowflake). |
offset |
long |
INTEGER |
Sim |
O offset dessa partição. |
CreateTime / . LogAppendTime |
long |
BIGINT |
Não |
Esse é o carimbo de data/hora associado à mensagem no tópico do Kafka. O valor é em milissegundos desde a meia-noite de 1.º de janeiro de 1970, UTC. Para obter mais informações, consulte: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html |
chave |
Cadeia de caracteres |
VARCHAR |
Não |
Se a mensagem for uma KeyedMessage do Kafka, esta é a chave da mensagem. Para que o conector possa armazenar a chave no RECORD_METADATA, o parâmetro key.converter no Propriedades de configuração do Kafka deve ser definido como “org.apache.kafka.connect.storage.StringConverter”. Caso contrário, o conector ignora as chaves. |
schema_id |
int |
INTEGER |
Não |
Ao utilizar Avro com um registro de esquema para especificar um esquema, este é a ID do esquema desse registro. |
cabeçalhos |
Objeto |
OBJECT |
Não |
Um cabeçalho é um par chave-valor associado ao registro e definido pelo usuário. Cada registro pode ter 0, 1 ou vários cabeçalhos. |
A quantidade de metadados gravados na coluna RECORD_METADATA é configurável usando propriedades opcionais de configuração do Kafka. Para obter mais informações, consulte Instalação e configuração do conector Kafka.
Os nomes e valores dos campos diferenciam maiúsculas e minúsculas.
Expressa em sintaxe JSON, uma mensagem de exemplo pode parecer semelhante à seguinte:
{
"meta":
{
"offset": 1,
"topic": "PressureOverloadWarning",
"partition": 12,
"key": "key name",
"schema_id": 123,
"CreateTime": 1234567890,
"headers":
{
"name1": "value1",
"name2": "value2"
}
},
"content":
{
"ID": 62,
"PSI": 451,
"etc": "..."
}
}
Você pode consultar as tabelas do Snowflake diretamente usando a sintaxe apropriada para consultar colunas VARIANT.
Aqui está um exemplo simples de extração de dados com base no tópico da RECORD_METADATA:
select
record_metadata:CreateTime,
record_content:ID
from table1
where record_metadata:topic = 'PressureOverloadWarning';
A saída seria semelhante a:
+------------+-----+
| CREATETIME | ID |
+------------+-----+
| 1234567890 | 62 |
+------------+-----+
Alternativamente, você pode extrair os dados dessas tabelas, nivelar os dados em colunas individuais e armazenar os dados em outras tabelas, que costumam ser mais fáceis de consultar.
Fluxo de trabalho do conector Kafka¶
O conector Kafka realiza o seguinte processo para assinar os tópicos do Kafka e criar objetos Snowflake:
O conector Kafka assina um ou mais tópicos do Kafka com base nas informações de configuração fornecidas no arquivo de configuração ou na linha de comando do Kafka (ou no Confluent Control Center; apenas para Confluent).
O conector cria os seguintes objetos para cada tópico:
Um estágio interno para armazenar temporariamente arquivos de dados para cada tópico.
Um canal para ingerir os arquivos de dados de cada partição do tópico.
Uma tabela para cada tópico. Se a tabela especificada para cada tópico não existir, o conector cria uma. Caso contrário, o conector cria as colunas RECORD_CONTENT e RECORD_METADATA na tabela existente e verifica se as outras colunas permitem valores nulos (e produz um erro se não permitirem).
O diagrama a seguir mostra o fluxo de ingestão do Kafka com o conector Kafka:
Um ou mais aplicativos publicam registros JSON ou Avro em um cluster Kafka. Os registros são divididos em uma ou mais partições temáticas.
O conector Kafka armazena em buffer as mensagens dos tópicos do Kafka. Quando um limite (tempo, memória ou número de mensagens) é atingido, o conector grava as mensagens em um arquivo temporário no estágio interno. O conector aciona o Snowpipe para ingerir o arquivo temporário. O Snowpipe copia um ponteiro para o arquivo de dados em uma fila.
Um warehouse fornecido pelo Snowflake carrega os dados do arquivo preparado na tabela de destino (por exemplo, a tabela especificada no arquivo de configuração do tópico) através do canal criado para a partição do tópico do Kafka.
(Não mostrado) O conector monitora o Snowpipe e exclui todos os arquivos do estágio interno depois de confirmar que os dados do arquivo foram carregados na tabela.
Se uma falha tiver impedido o carregamento dos dados, o conector move o arquivo para o estágio de tabela e produz uma mensagem de erro.
O conector repete os passos 2-4.
Atenção
O Snowflake pesquisa a API insertReport
durante uma hora. Se o status de um arquivo ingerido não for bem sucedido dentro desta hora, os arquivos que estão sendo ingeridos são movidos para um estágio de tabela.
Pode levar pelo menos uma hora para que estes arquivos estejam disponíveis no estágio de tabela. Os arquivos só são movidos para o estágio de tabela se seu status de ingestão não for encontrado dentro da hora anterior.
Tolerância a falhas¶
Tanto o Kafka quanto o conector Kafka são tolerantes a falhas. As mensagens não são duplicadas nem descartadas silenciosamente.
A lógica de deduplicação de dados no fluxo de trabalho do Snowpipe na cadeia de carregamento de dados elimina cópias duplicadas de dados repetidos, exceto em casos raros. Se um erro for detectado enquanto o Snowpipe carrega um registro (por exemplo, o registro não foi bem formado em JSON ou Avro), então o registro não é carregado. Em vez disso, o registro é movido para um estágio de tabela.
O conector Kafka com Snowpipe Streaming oferece suporte a filas de mensagens mortas (DLQ) para tratamento de erros. Para obter mais informações, consulte Tratamento de erros e Propriedades DLQ do conector Kafka com Snowpipe Streaming.
Limitações da tolerância a falhas com o conector¶
Os tópicos do Kafka podem ser configurados com um limite de espaço de armazenamento ou de tempo de retenção.
O tempo de retenção padrão é de 7 dias. Se o sistema ficar offline por mais do que o tempo de retenção, então os registros expirados não serão carregados. Da mesma forma, se o limite do espaço de armazenamento do Kafka for excedido, algumas mensagens não serão entregues.
Se as mensagens no tópico do Kafka forem excluídas ou atualizadas, essas mudanças podem não ser refletidas na tabela do Snowflake.
Atenção
As instâncias do conector Kafka não se comunicam umas com as outras. Se você iniciar várias instâncias do conector sobre os mesmos tópicos ou partições, então várias cópias da mesma linha poderão ser inseridas na tabela. Isto não é recomendado. Cada tópico deve ser processado por apenas uma instância do conector.
É teoricamente possível que as mensagens fluam do Kafka mais rapidamente do que o Snowflake pode ingeri-las. Na prática, porém, isso é improvável. Se isso ocorrer, então a solução do problema exigiria o ajuste do desempenho do cluster do Kafka Connect. Por exemplo:
Ajustando o número de nós no cluster do Connect.
Ajustando o número de tarefas atribuídas ao conector.
Compreendendo o impacto da largura de banda da rede entre o conector e a implantação do Snowflake.
Importante
Nada garante que as linhas serão inseridas na ordem em que foram originalmente publicadas.
Plataformas com suporte¶
O conector Kafka pode funcionar em qualquer cluster do Kafka Connect, e pode enviar dados para uma conta Snowflake em qualquer plataforma de nuvem com suporte.
Suporte a dados protobuf¶
O conector Kafka 1.5.0 (ou superior) suporta buffers de protocolo (protobuf) através de um conversor de protobuf. Para obter mais detalhes, consulte Carregamento de dados protobuf usando o conector Snowflake para Kafka.
Informações de faturamento¶
Não há cobrança direta pelo uso do conector Kafka. No entanto, há custos indiretos:
O Snowpipe é usado para carregar os dados que o conector lê do Kafka, e o tempo de processamento do Snowpipe é cobrado na sua conta.
O armazenamento de dados é debitado na sua conta.
Limitações do conector Kafka¶
As SMTs (Single Message Transformations) são aplicadas às mensagens conforme elas fluem através do Kafka Connect. Quando você configura o Propriedades de configuração do Kafka, se você definir ou key.converter
ou value.converter
para um dos seguintes valores, então as SMTs não são suportadas na chave ou valor correspondente:
com.snowflake.kafka.connector.records.SnowflakeJsonConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverter
com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
Quando nem key.converter
nem value.converter
é definido, então a maioria das SMTs são suportadas, com a exceção atual de regex.router
.
Embora os conversores do Snowflake não suportem SMTs, a versão 1.4.3 (ou superior) do conector Kafka oferece suporte a muitos conversores desenvolvidos pela comunidade, entre eles:
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.json.JsonConverter
Para obter mais informações sobre as SMTs, consulte https://docs.confluent.io/current/connect/transforms/index.html.