Detecção e evolução do esquema para conector Kafka com Snowpipe Streaming¶
O conector Kafka com Snowpipe Streaming oferece suporte à detecção e evolução de esquema. A estrutura das tabelas no Snowflake pode ser definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados de streaming do Snowpipe carregados pelo conector Kafka.
Sem detecção e evolução de esquema, a tabela Snowflake carregada pelo conector Kafka consiste apenas em duas colunas VARIANT, RECORD_CONTENT e RECORD_METADATA. Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos arquivos de dados.
Nota
Este recurso funciona apenas com o conector Kafka com Snowpipe Streaming. Ele não oferece suporte ao conector Kafka com Snowpipe baseado em arquivo.
Neste tópico:
Pré-requisitos¶
Antes de ativar esse recurso, configure os seguintes pré-requisitos.
Baixar a versão do conector Kafka 2.0.0 ou posterior. Para obter mais informações, consulte Instalação e configuração do conector Kafka.
Use o comando ALTER TABLE para definir o parâmetro
ENABLE_SCHEMA_EVOLUTION
como TRUE na tabela. Você também deve usar uma função com o privilégio OWNERSHIP na tabela. Para obter mais informações, consulte Evolução do esquema da tabela.
Configuração das propriedades necessárias do Kafka¶
Configure as propriedades necessárias a seguir no arquivo de propriedades do conector Kafka:
snowflake.ingestion.method
Especifique o uso de
SNOWPIPE_STREAMING
para carregar os dados do tópico Kafka. Observe que esse recurso não oferece suporte paraSNOWPIPE
.snowflake.enable.schematization
Especifique
TRUE
para ativar a detecção e evolução de esquema para o conector Kafka com Snowpipe Streaming. O valor padrão éFALSE
.Quando esta propriedade é definida como
TRUE
,Para quaisquer novas tabelas criadas pelo conector Kafka, o parâmetro de tabela
ENABLE_SCHEMA_EVOLUTION
é definido automaticamente paraTRUE
.Para qualquer tabela existente, você ainda precisa definir manualmente o parâmetro de tabela
ENABLE_SCHEMA_EVOLUTION
comoTRUE
.
schema.registry.url
Especifique o URL do serviço de registro de esquema. O valor padrão é vazio.
Dependendo do formato do arquivo,
schema.registry.url
é obrigatório ou opcional. A detecção de esquema com o conector Kafka é suportada em qualquer um dos cenários abaixo:O registro de esquema é necessário para Avro e Protobuf. A coluna é criada com os tipos de dados definidos no registro de esquema fornecido.
O registro de esquema é opcional para JSON. Se não houver registro de esquema, o tipo de dados será inferido com base nos dados fornecidos.
Configure propriedades adicionais em seu arquivo de propriedades do conector Kafka como de costume. Para obter mais informações, consulte Configuração do conector Kafka.
Conversores¶
Os conversores de dados estruturados, como Json, Avro e Protobuf, são compatíveis. Observe que testamos apenas os seguintes conversores de dados estruturados:
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
Quaisquer conversores de dados não estruturados não são compatíveis com esquematização Por exemplo,
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
Os conversores Snowflake não são compatíveis com o Snowpipe Streaming. Alguns conversores de dados personalizados não foram testados e podem não ser compatíveis.
Notas de uso¶
A detecção de esquema com o conector Kafka é suportada com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro e Protobuf), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.
A evolução de esquema com conector Kafka oferece suporte às seguintes modificações nas colunas da tabela:
Adição de novas colunas
Descarte a restrição NOT NULL se a coluna de dados de origem estiver ausente.
Se o conector Kafka criar a tabela de destino, a evolução de esquema será ativada por padrão. No entanto, se a evolução de esquema estiver desativada para uma tabela existente, o conector Kafka tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens mortas configuradas (DLQ).
JSON ARRAY não é compatível com esquematização adicional.
Para o conector Kafka com o Snowpipe Streaming, a evolução do esquema não é rastreada pela saída
SchemaEvolutionRecord
nas seguintes exibições e comandos: Exibição INFORMATION_SCHEMA COLUMNS, Exibição ACCOUNT_USAGE COLUMNS, comando DESCRIBE TABLE e comando SHOW COLUMNS. A saídaSchemaEvolutionRecord
sempre mostra NULL.
Exemplos¶
Os exemplos a seguir demonstram as tabelas criadas antes e depois da detecção e evolução do esquema serem ativadas para o conector Kafka com Snowpipe Streaming.
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|