Detecção e evolução do esquema para conector Kafka com Snowpipe Streaming

O conector Kafka com Snowpipe Streaming oferece suporte à detecção e evolução de esquema. A estrutura das tabelas no Snowflake pode ser definida e evoluída automaticamente para oferecer suporte à estrutura dos novos dados de streaming do Snowpipe carregados pelo conector Kafka.

Sem detecção e evolução de esquema, a tabela Snowflake carregada pelo conector Kafka consiste apenas em duas colunas VARIANT, RECORD_CONTENT e RECORD_METADATA. Com a detecção e evolução de esquema habilitadas, o Snowflake pode detectar o esquema dos dados de streaming e carregar dados em tabelas que correspondem automaticamente a qualquer esquema definido pelo usuário. Snowflake também permite adicionar novas colunas ou descartar a restrição NOT NULL de colunas ausentes em novos arquivos de dados.

Nota

Este recurso em versão preliminar funciona apenas com o conector Kafka com Snowpipe Streaming. Atualmente ele não oferece suporte ao conector Kafka com Snowpipe baseado em arquivo.

Neste tópico:

Pré-requisitos

Antes de ativar esse recurso, configure os seguintes pré-requisitos.

Configuração das propriedades necessárias do Kafka

Configure as propriedades necessárias a seguir no arquivo de propriedades do conector Kafka:

snowflake.ingestion.method

Especifique o uso de SNOWPIPE_STREAMING para carregar os dados do tópico Kafka. Observe que este recurso em versão preliminar atualmente não é compatível com SNOWPIPE.

snowflake.enable.schematization

Especifique TRUE para ativar a detecção e evolução de esquema para o conector Kafka com Snowpipe Streaming. O valor padrão é FALSE.

schema.registry.url

Especifique o URL do serviço de registro de esquema. O valor padrão é vazio.

Dependendo do formato do arquivo, schema.registry.url é obrigatório ou opcional. A detecção de esquema com o conector Kafka é suportada em qualquer um dos cenários abaixo:

  • O registro de esquema é necessário para Avro e Protobuf. A coluna é criada com os tipos de dados definidos no registro de esquema fornecido.

  • O registro de esquema é opcional para JSON. Se não houver registro de esquema, o tipo de dados será inferido com base nos dados fornecidos.

Configure propriedades adicionais em seu arquivo de propriedades do conector Kafka como de costume. Para obter mais informações, consulte Configuração do conector Kafka.

Conversores

Todos os conversores de dados estruturados, como Json, Avro e Protobuf, são suportados. Observe que testamos apenas os seguintes conversores de dados estruturados:

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

Quaisquer conversores de dados não estruturados não são suportados. Por exemplo,

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Alguns conversores de dados personalizados podem não ser suportados. Entre em contato com o suporte Snowflake se precisar de ajuda.

Notas de uso

  • A detecção de esquema com o conector Kafka é suportada com ou sem um registro de esquema fornecido. Se estiver usando o registro de esquema (Avro e Protobuf), a coluna será criada com os tipos de dados definidos no registro de esquema fornecido. Se não houver registro de esquema (JSON), o tipo de dados será inferido com base nos dados fornecidos.

  • A evolução de esquema com conector Kafka oferece suporte às seguintes modificações nas colunas da tabela:

    • Adição de novas colunas

    • Descarte a restrição NOT NULL se a coluna de dados de origem estiver ausente.

  • Se o conector Kafka criar a tabela de destino, a evolução de esquema será ativada por padrão. No entanto, se a evolução de esquema estiver desativada para uma tabela existente, o conector Kafka tentará enviar as linhas com esquemas incompatíveis para as filas de mensagens mortas configuradas (DLQ).

Exemplos

Os exemplos a seguir demonstram as tabelas criadas antes e depois da detecção e evolução do esquema serem ativadas para o conector Kafka com Snowpipe Streaming.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy