Detecção e evolução do esquema para conector Kafka com Snowpipe Streaming

O conector Kafka com Snowpipe Streaming oferece suporte à detecção e evolução de esquema. A estrutura das tabelas no Snowflake pode ser definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados de streaming do Snowpipe carregados pelo conector Kafka.

Sem detecção e evolução de esquema, a tabela Snowflake carregada pelo conector Kafka consiste apenas em duas colunas VARIANT, RECORD_CONTENT e RECORD_METADATA. Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos arquivos de dados.

Nota

Este recurso funciona apenas com o conector Kafka com Snowpipe Streaming. Ele não oferece suporte ao conector Kafka com Snowpipe baseado em arquivo.

Neste tópico:

Pré-requisitos

Antes de ativar esse recurso, configure os seguintes pré-requisitos.

Configuração das propriedades necessárias do Kafka

Configure as propriedades necessárias a seguir no arquivo de propriedades do conector Kafka:

snowflake.ingestion.method

Especifique o uso de SNOWPIPE_STREAMING para carregar os dados do tópico Kafka. Observe que esse recurso não oferece suporte para SNOWPIPE.

snowflake.enable.schematization

Especifique TRUE para ativar a detecção e evolução de esquema para o conector Kafka com Snowpipe Streaming. O valor padrão é FALSE.

Quando esta propriedade é definida como TRUE,

  • Para quaisquer novas tabelas criadas pelo conector Kafka, o parâmetro de tabela ENABLE_SCHEMA_EVOLUTION é definido automaticamente para TRUE.

  • Para qualquer tabela existente, você ainda precisa definir manualmente o parâmetro de tabela ENABLE_SCHEMA_EVOLUTION como TRUE.

schema.registry.url

Especifique o URL do serviço de registro de esquema. O valor padrão é vazio.

Dependendo do formato do arquivo, schema.registry.url é obrigatório ou opcional. A detecção de esquema com o conector Kafka é suportada em qualquer um dos cenários abaixo:

  • O registro de esquema é necessário para Avro e Protobuf. A coluna é criada com os tipos de dados definidos no registro de esquema fornecido.

  • O registro de esquema é opcional para JSON. Se não houver registro de esquema, o tipo de dados será inferido com base nos dados fornecidos.

Configure propriedades adicionais em seu arquivo de propriedades do conector Kafka como de costume. Para obter mais informações, consulte Configuração do conector Kafka.

Conversores

Os conversores de dados estruturados, como Json, Avro e Protobuf, são compatíveis. Observe que testamos apenas os seguintes conversores de dados estruturados:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

Quaisquer conversores de dados não estruturados não são compatíveis com esquematização Por exemplo,

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Os conversores Snowflake não são compatíveis com o Snowpipe Streaming. Alguns conversores de dados personalizados não foram testados e podem não ser compatíveis.

Notas de uso

  • A detecção de esquema com o conector Kafka é suportada com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro e Protobuf), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.

  • A evolução de esquema com conector Kafka oferece suporte às seguintes modificações nas colunas da tabela:

    • Adição de novas colunas

    • Descarte a restrição NOT NULL se a coluna de dados de origem estiver ausente.

  • Se o conector Kafka criar a tabela de destino, a evolução de esquema será ativada por padrão. No entanto, se a evolução de esquema estiver desativada para uma tabela existente, o conector Kafka tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens mortas configuradas (DLQ).

  • JSON ARRAY não é compatível com esquematização adicional.

  • Para o conector Kafka com o Snowpipe Streaming, a evolução do esquema não é rastreada pela saída SchemaEvolutionRecord nas seguintes exibições e comandos: Exibição INFORMATION_SCHEMA COLUMNS, Exibição ACCOUNT_USAGE COLUMNS, comando DESCRIBE TABLE e comando SHOW COLUMNS. A saída SchemaEvolutionRecord sempre mostra NULL.

Exemplos

Os exemplos a seguir demonstram as tabelas criadas antes e depois da detecção e evolução do esquema serem ativadas para o conector Kafka com Snowpipe Streaming.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy