Detecção e evolução do esquema para conector Kafka com Snowpipe Streaming¶
O conector Kafka com Snowpipe Streaming oferece suporte à detecção e evolução de esquema. A estrutura das tabelas no Snowflake pode ser definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados de streaming do Snowpipe carregados pelo conector Kafka.
Sem detecção e evolução de esquema, a tabela Snowflake carregada pelo conector Kafka consiste apenas em duas colunas VARIANT, RECORD_CONTENT e RECORD_METADATA. Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos arquivos de dados.
Nota
Este recurso em versão preliminar funciona apenas com o conector Kafka com Snowpipe Streaming. Atualmente ele não oferece suporte ao conector Kafka com Snowpipe baseado em arquivo.
Neste tópico:
Pré-requisitos¶
Antes de ativar esse recurso, configure os seguintes pré-requisitos.
Baixar a versão do conector Kafka 2.0.0 ou posterior. Para obter mais informações, consulte Instalação e configuração do conector Kafka.
Use o comando ALTER TABLE para definir o parâmetro
ENABLE_SCHEMA_EVOLUTION
como TRUE na tabela. Você deve usar uma função com o privilégio EVOLVE SCHEMA na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.
Configuração das propriedades necessárias do Kafka¶
Configure as propriedades necessárias a seguir no arquivo de propriedades do conector Kafka:
snowflake.ingestion.method
Especifique o uso de
SNOWPIPE_STREAMING
para carregar os dados do tópico Kafka. Observe que este recurso em versão preliminar atualmente não é compatível comSNOWPIPE
.snowflake.enable.schematization
Especifique
TRUE
para ativar a detecção e evolução de esquema para o conector Kafka com Snowpipe Streaming. O valor padrão éFALSE
.schema.registry.url
Especifique o URL do serviço de registro de esquema. O valor padrão é vazio.
Dependendo do formato do arquivo,
schema.registry.url
é obrigatório ou opcional. A detecção de esquema com o conector Kafka é suportada em qualquer um dos cenários abaixo:O registro de esquema é necessário para Avro e Protobuf. A coluna é criada com os tipos de dados definidos no registro de esquema fornecido.
O registro de esquema é opcional para JSON. Se não houver registro de esquema, o tipo de dados será inferido com base nos dados fornecidos.
Configure propriedades adicionais em seu arquivo de propriedades do conector Kafka como de costume. Para obter mais informações, consulte Configuração do conector Kafka.
Conversores¶
Todos os conversores de dados estruturados, como Json, Avro e Protobuf, são suportados. Observe que testamos apenas os seguintes conversores de dados estruturados:
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
Quaisquer conversores de dados não estruturados não são suportados. Por exemplo,
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
Alguns conversores de dados personalizados podem não ser suportados. Entre em contato com o suporte Snowflake se precisar de ajuda.
Notas de uso¶
A detecção de esquema com o conector Kafka é suportada com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro e Protobuf), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.
A evolução de esquema com conector Kafka oferece suporte às seguintes modificações nas colunas da tabela:
Adição de novas colunas
Descarte a restrição NOT NULL se a coluna de dados de origem estiver ausente.
Se o conector Kafka criar a tabela de destino, a evolução de esquema será ativada por padrão. No entanto, se a evolução de esquema estiver desativada para uma tabela existente, o conector Kafka tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens mortas configuradas (DLQ).
Exemplos¶
Os exemplos a seguir demonstram as tabelas criadas antes e depois da detecção e evolução do esquema serem ativadas para o conector Kafka com Snowpipe Streaming.
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|