Migrar do conector Kafka v3 para v4

Este tópico descreve como migrar do conector Kafka clássico (v3 e versões anteriores) para o Snowflake Connector for Kafka (v4).

Visão geral

O Snowflake Connector for Kafka (v4) é uma reformulação do zero que usa exclusivamente a arquitetura de alto desempenho do Snowpipe Streaming. Você deve criar uma nova configuração do conector manualmente para migrar para o v4.

Importante

O conector v4 não pode ser usado como substituto direto do v3. Ele usa uma classe de conector diferente, comportamentos padrão distintos e outro conjunto de recursos. Revise as mudanças interruptivas e os caminhos de migração abaixo antes de migrar.

Alterações da precificação

O conector v4 usa precificação fixa baseada em taxa de transferência de acordo com o volume de dados ingeridos (GB). Trata-se do mesmo modelo de precificação da arquitetura de alto desempenho do Snowpipe Streaming. Para estimar os custos, multiplique sua taxa de ingestão de dados pelo preço por GB apresentado na página de custo do Snowpipe Streaming.

Ele substitui o modelo de precificação do v3, que era baseado na computação sem servidor e nas notificações de arquivo.

Validação de compatibilidade

Por padrão, o v4 habilita uma verificação de compatibilidade na inicialização (snowflake.streaming.validate.compatibility.with.classic=true), que impede você de executar acidentalmente o v4 com uma configuração copiada do v3. Quando habilitado, o conector valida na inicialização que você definiu explicitamente as principais configurações de migração. Se alguma estiver ausente ou for incompatível, o conector falhará com uma mensagem de erro descritiva informando exatamente o que definir.

O validador verifica o seguinte:

  • snowflake.validation está definido como client_side

  • snowflake.compatibility.enable.column.identifier.normalization está definido como true

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization está definido como true

  • snowflake.enable.schematization é explicitamente definido como true ou false (o padrão mudou de false no v3 para true no v4, portanto o validador exige que você confirme sua escolha)

  • snowflake.streaming.classic.offset.migration foi explicitamente definido

  • snowflake.streaming.classic.offset.migration.include.connector.name foi explicitamente definido (quando a migração de deslocamento é strict ou best_effort)

Depois de revisar as mudanças interruptivas e definir explicitamente essas configurações, você poderá definir snowflake.streaming.validate.compatibility.with.classic=false para ignorar a verificação nas reinicializações seguintes.

Para obter as descrições completas dessas propriedades, consulte Propriedades de esquematização, validação e compatibilidade e Propriedades da migração de deslocamento.

Caminhos de migração

O caminho de migração depende de como seu conector v3 foi configurado.

Antes de migrar, certifique-se de que snowflake.metadata.topic, snowflake.metadata.offset.and.partition e snowflake.metadata.createtime estão habilitados no seu conector v3 (eles estão habilitados por padrão). Isto assegura que RECORD_METADATA contenha os campos de tópico, partição e deslocamento necessários para desduplicação, caso ocorra algum problema.

Migrando do modo Snowpipe v3

Se o seu conector v3 usava o Snowpipe clássico (o snowflake.ingestion.method=SNOWPIPE padrão), o v4 migrará sem problemas usando deslocamentos de grupo de consumidores Kafka.

  1. Pare o conector v3.

  2. Aguarde a ingestão de todos os dados preparados no Snowflake. O Snowpipe clássico prepara os arquivos antes de carregá-los, e se ainda houver arquivos na fila quando você parar o conector, eles serão carregados de forma assíncrona. Iniciar o conector v4 antes da conclusão desse processo pode fazer com que os dados fiquem fora de ordem.

  3. Implante a nova configuração do v4 usando o mesmo nome de conector que no v3 (o mesmo grupo de consumidores Kafka). Defina a configuração de migração de deslocamento para ignorar a migração do SSv1:

    snowflake.streaming.classic.offset.migration=skip
    
  4. Inicie o conector v4. Ele herda os deslocamentos do grupo de consumidores Kafka e retoma a ingestão de onde a v3 parou.

Conclua a alternância dentro de offsets.retention.minutes (padrão de 7 dias) para evitar a expiração do deslocamento.

Esse caminho de migração não introduz duplicatas ou lacunas.

Migrando do modo Snowpipe Streaming v3

Se o seu conector v3 usava o Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), o v4 poderá migrar automaticamente os deslocamentos confirmados dos canais do Snowpipe Streaming v3 (SSv1). Isso impede duplicatas ou lacunas.

  1. Pare o conector v3.

  2. Implante a nova configuração do v4 usando o mesmo nome de conector que no v3. Defina as configurações da migração de deslocamento:

    # Use 'strict' to fail if SSv1 channels aren't found, or 'best_effort' to fall
    # back to Kafka consumer group offsets if channels aren't found.
    snowflake.streaming.classic.offset.migration=best_effort
    
    # Must match your v3 setting for snowflake.streaming.channel.name.include.connector.name.
    # Set to 'true' if your v3 connector included the connector name in channel names.
    snowflake.streaming.classic.offset.migration.include.connector.name=false
    
  3. Inicie o conector v4. Ele recupera os deslocamentos confirmados dos canais do SSv1 existentes e retoma a ingestão de onde o v3 parou.

Conclua a alternância dentro de offsets.retention.minutes (padrão de 7 dias).

Fazendo downgrade de v4 para v3

O downgrade do v4 para o v3 é possível revertendo o processo de migração. No entanto, registros duplicados são esperados após um downgrade porque o v3 e o v4 rastreiam os deslocamentos de maneira diferente.

Para fazer downgrade:

  1. Pare o conector v4.

  2. Implante sua configuração do v3 usando o mesmo nome de conector.

  3. Inicie o conector v3.

  4. Após o downgrade, desduplique seus dados usando a coluna RECORD_METADATA. A consulta a seguir remove os registros duplicados usando uma função de janela com base em tópico, partição e deslocamento:

    DELETE FROM my_table
    WHERE RECORD_METADATA IS NOT NULL
      AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
          IN (
            SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
            FROM (
              SELECT RECORD_METADATA,
                     ROW_NUMBER() OVER (
                       PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                       ORDER BY RECORD_METADATA:offset
                     ) AS rn
              FROM my_table
              WHERE RECORD_METADATA IS NOT NULL
            )
            WHERE rn > 1
          );
    

Importante

A desduplicação exige que RECORD_METADATA contenha os campos de tópico, partição e deslocamento. Garanta que as configurações snowflake.metadata.topic e snowflake.metadata.offset.and.partition estejam habilitadas antes de migrar para o v4.

Se você encontrar problemas durante o downgrade, entre em contato com o suporte Snowflake.

Alterações interruptivas

Nova classe do conector

Mudança

v3

v4

Classe do conector

com.snowflake.kafka.connector.SnowflakeSinkConnector

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

Métodos de ingestão

Snowpipe (lote) ou Snowpipe Streaming (opcional)

Somente Snowpipe Streaming

Versão Java

Java 8+

Java 11+

Comportamentos padrão alterados

Configuração

Padrão do v3

Padrão do v4

snowflake.enable.schematization

false (registros armazenados nas colunas RECORD_CONTENT e RECORD_METADATA VARIANT)

true (campos de registro mapeados para colunas de tabela individuais)

snowflake.validation

Equivalente do lado do cliente

server_side (validação realizada pelo back-end do Snowflake)

snowflake.compatibility.enable.autogenerated.table.name.sanitization

Equivalente a true (caracteres inválidos substituídos, nomes em maiúsculas)

false (nomes de tópicos usados como estão para nomes de tabelas, preservando maiúsculas e minúsculas e caracteres especiais)

snowflake.compatibility.enable.column.identifier.normalization

Equivalente de true (nomes de coluna em maiúsculas)

false (identificadores de coluna preservam maiúsculas e minúsculas)

Configurações removidas

As seguintes propriedades de configuração do v3 não são aceitas no v4:

  • snowflake.ingestion.method (v4 usa exclusivamente o Snowpipe Streaming)

  • buffer.flush.time, buffer.size.bytes, buffer.count.records (gerenciados pelo SDK do Snowpipe Streaming)

  • snowflake.streaming.max.client.lag (gerenciado pelo SDK)

  • snowflake.streaming.enable.single.buffer

  • snowflake.streaming.max.memory.limit.bytes

  • snowflake.streaming.closeChannelsInParallel.enabled (sempre paralelo no v4)

  • snowflake.streaming.iceberg.enabled (detecção automática no v4)

  • snowflake.snowpipe.* (Snowpipe sem streaming incompatível)

  • enable.streaming.client.optimization

  • enable.streaming.channel.offset.migration (migração do formato de nome do canal interno do v3, não necessário no v4)

  • snowflake.streaming.channel.name.include.connector.name

  • enable.streaming.channel.offset.verification

  • snowflake.authenticator (compatível apenas com autenticação por par de chaves)

  • snowflake.oauth.* (OAuth incompatível com v4)

  • provider

Conversores personalizados removidos

Os seguintes conversores personalizados fornecidos pelo Snowflake não estão disponíveis no v4:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Em vez disso, use os conversores padrão da comunidade:

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

Autenticação

O v4 oferece suporte apenas à autenticação por par de chaves. Se você usar OAuth com v3, é necessário mudar para autenticação por par de chaves antes de migrar.

Etapas de migração

  1. Revisar as mudanças interruptivas: revise as mudanças interruptivas acima e determine como elas afetam sua implantação atual.

  2. Verificar as configurações de metadados: antes de migrar, confirme que snowflake.metadata.topic e snowflake.metadata.offset.and.partition estejam habilitados no seu conector v3 (eles estão habilitados por padrão). Isso garante a desduplicação, se necessário.

  3. Criar uma nova configuração do conector: crie um novo arquivo de configuração usando a classe SnowflakeStreamingSinkConnector. Não é possível copiar a configuração do v3 diretamente porque o v4 tem padrões diferentes para esquematização, validação e tratamento de identificadores. Consulte Snowflake Connector for Kafka: Install and configure para conferir a referência completa da configuração.

  4. Definir as configurações de compatibilidade e da migração de deslocamento: o conector v4 valida essas configurações na inicialização. Você deve definir explicitamente o seguinte:

    • snowflake.enable.schematization: defina como true (novo comportamento do v4) ou false (comportamento do v3).

    • snowflake.validation: defina como client_side para compatibilidade com o v3 ou server_side para padrões do v4.

    • snowflake.compatibility.enable.autogenerated.table.name.sanitization: defina como true para compatibilidade com o v3.

    • snowflake.compatibility.enable.column.identifier.normalization: defina como true para compatibilidade com o v3.

    • snowflake.streaming.classic.offset.migration: Defina como skip se estiver migrando do modo Snowpipe, ou como best_effort/strict se estiver migrando do modo Snowpipe Streaming.

    Para obter mais informações, consulte Validação de compatibilidade.

  5. Substituir os conversores personalizados: se você usa os conversores fornecidos pelo Snowflake, substitua-os pelos equivalentes da comunidade listados acima.

  6. Seguir o caminho de migração para seu modo de ingestão: consulte Migrando do modo Snowpipe ou Migrando do modo Snowpipe Streaming acima.

  7. Testar com dados de amostra: implante a nova configuração do conector em um ambiente de teste e verifique se os dados fluem corretamente antes de migrar as cargas de trabalho da produção.

  8. Adotar os padrões do v4 de forma incremental: depois que sua migração for validada, considere adotar gradativamente os padrões do v4 (validação do lado do servidor, identificadores que diferenciam maiúsculas de minúsculas) para melhorar o desempenho e o alinhamento com as convenções do Snowflake.