Migrer du connecteur Kafka v3 vers v4

Cette rubrique décrit comment migrer du connecteur Kafka classique (v3 et antérieur) vers le connecteur Snowflake Connector for Kafka (v4).

Vue d’ensemble

Le connecteur Snowflake Connector for Kafka (v4) est une réécriture de base qui utilise exclusivement l’architecture hautes performances Snowpipe Streaming. Vous devez créer manuellement une nouvelle configuration de connecteur pour migrer vers v4.

Important

Le connecteur v4 ne peut pas être utilisé en remplacement de v3. Il utilise une classe de connecteur différente, des comportements par défaut différents et un ensemble de fonctionnalités différent. Consultez les changements importants et les chemins de migration ci-dessous avant la migration.

Modifications de la tarification

Le connecteur v4 utilise une tarification forfaitaire basée sur le débit et basée sur le volume de données ingérées (GB). Il s’agit du même modèle de tarification que l’Architecture hautes performances Snowpipe Streaming. Pour estimer les coûts, multipliez votre taux d’ingestion de données par le prix par GB indiqué sur la page de coûts Snowpipe Streaming.

Cela remplace le modèle de tarification v3, qui était basé sur un calcul sans serveur et des notifications de fichiers.

Validation de compatibilité

Par défaut, v4 active un contrôle de compatibilité au démarrage (snowflake.streaming.validate.compatibility.with.classic=true) qui vous empêche d’exécuter accidentellement v4 avec une configuration v3 copiée. Lorsque cette option est activée, le connecteur vérifie au démarrage que vous avez explicitement configuré les paramètres de migration clés. Si l’une des paramètres est manquant ou incompatible, le connecteur échoue avec un message d’erreur descriptif indiquant exactement ce qu’il faut définir.

Le validateur vérifie les éléments suivants :

  • snowflake.validation est défini sur client_side

  • snowflake.compatibility.enable.column.identifier.normalization est défini sur true

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization est défini sur true

  • snowflake.enable.schematization est explicitement défini sur true ou false (la valeur par défaut est passé de false dans v3 à true dans v4, le validateur vous demande donc de confirmer votre choix)

  • snowflake.streaming.classic.offset.migration est explicitement défini

  • snowflake.streaming.classic.offset.migration.include.connector.name est explicitement défini (lorsque la migration de décalage est strict ou best_effort)

Après avoir examiné les modifications importantes et configuré explicitement ces paramètres, vous pouvez définir snowflake.streaming.validate.compatibility.with.classic=false pour ignorer la vérification des redémarrages suivants.

Pour une description complète de ces propriétés, voir Propriétés de schématisation, de validation et de compatibilité et Propriétés de migration de décalage.

Chemins de migration

Le chemin de migration dépend de la manière dont votre connecteur v3 a été configuré.

Avant la migration, assurez-vous que snowflake.metadata.topic, snowflake.metadata.offset.and.partition, et snowflake.metadata.createtime sont activés dans votre connecteur v3 (ils sont activés par défaut). Ceci garantit que RECORD_METADATA contient les champs du sujet, de la partition et du décalage nécessaires à la déduplication en cas de problème.

Migration à partir du mode Snowpipe v3

Si votre connecteur v3 utilisait Snowpipe classique (par défaut snowflake.ingestion.method=SNOWPIPE), v4 migre de manière transparente en utilisant les décalages du groupe de consommateurs Kafka.

  1. Arrêtez le connecteur v3.

  2. Attendez que toutes les données préparées soient ingérées dans Snowflake. Snowpipe classique met en zone de préparation les fichiers avant de les charger, et tous les fichiers encore dans la file d’attente lorsque vous arrêtez le connecteur seront chargés de manière asynchrone. Si vous lancez le connecteur v4 avant que cette opération ne soit terminée, les données risquent d’être enregistrées dans le désordre.

  3. Déployez la nouvelle configuration v4 en utilisant le même nom de connecteur que v3 (même groupe de consommateurs Kafka). Définissez la configuration de la migration du décalage pour ignorer la migration SSv1 :

    snowflake.streaming.classic.offset.migration=skip
    
  4. Démarrez le connecteur v4. Il hérite des décalages du groupe de consommateurs Kafka et reprend l’ingestion là où v3 s’est arrêté.

Effectuez le basculement dans offsets.retention.minutes (7 jours par défaut) pour éviter l’expiration du décalage.

Ce chemin de migration n’introduit pas de doublons ni d’écarts.

Migration à partir du mode v3 Snowpipe Streaming

Si votre connecteur v3 utilisait Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), v4 peut automatiquement migrer les décalages validés depuis les canaux v3 Snowpipe Streaming (SSv1). Cela permet d’éviter les doublons ou les écarts.

  1. Arrêtez le connecteur v3.

  2. Déployez la nouvelle configuration v4 en utilisant le même nom de connecteur que v3. Configurez les paramètres de migration de décalage :

    # Use 'strict' to fail if SSv1 channels aren't found, or 'best_effort' to fall
    # back to Kafka consumer group offsets if channels aren't found.
    snowflake.streaming.classic.offset.migration=best_effort
    
    # Must match your v3 setting for snowflake.streaming.channel.name.include.connector.name.
    # Set to 'true' if your v3 connector included the connector name in channel names.
    snowflake.streaming.classic.offset.migration.include.connector.name=false
    
  3. Démarrez le connecteur v4. Il récupère les décalages validés à partir des canaux SSv1 existants et reprend l’ingestion là où v3 s’est arrêté.

Effectuez le basculement dans offsets.retention.minutes (par défaut 7 jours).

Mise à niveau de v4 vers v3

Le retour de v4 à v3 est possible en inversant le processus de migration. Cependant, des enregistrements en double sont attendus après une mise à niveau vers une version inférieure, car v3 et v4 suivent les décalages différemment.

Pour passer à une version inférieure :

  1. Arrêtez le connecteur v4.

  2. Déployez votre configuration v3 en utilisant le même nom de connecteur.

  3. Démarrez le connecteur v3.

  4. Après la mise à niveau vers une version inférieure, dédupliquez vos données à l’aide de la colonne RECORD_METADATA. La requête suivante supprime les enregistrements en double à l’aide d’une fonction de fenêtre sur le sujet, la partition et le décalage :

    DELETE FROM my_table
    WHERE RECORD_METADATA IS NOT NULL
      AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
          IN (
            SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
            FROM (
              SELECT RECORD_METADATA,
                     ROW_NUMBER() OVER (
                       PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                       ORDER BY RECORD_METADATA:offset
                     ) AS rn
              FROM my_table
              WHERE RECORD_METADATA IS NOT NULL
            )
            WHERE rn > 1
          );
    

Important

La déduplication exige que RECORD_METADATA contienne des champs de sujet, de partition et de décalage. Assurez-vous que les paramètres snowflake.metadata.topic et``snowflake.metadata.offset.and.partition`` soient activés avant la migration vers v4.

Si vous rencontrez des problèmes pendant la mise à niveau vers une version inférieure, contactez le support Snowflake.

Changements importants

Nouvelle classe de connecteur

Changement

v3

v4

Classe de connecteur

com.snowflake.kafka.connector.SnowflakeSinkConnector

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

Méthodes d’ingestion

Snowpipe (lot) ou Snowpipe Streaming (facultatif)

Snowpipe Streaming uniquement

Version Java

Java 8+

Java 11+

Modification des comportements par défaut

Configuration

v3 par défaut

v4 par défaut

snowflake.enable.schematization

false (enregistrements stockés dans les colonnes RECORD_CONTENT etRECORD_METADATAVARIANT)

true (champs d’enregistrement mappés à des colonnes de tables individuelles)

snowflake.validation

Équivalent côté client

server_side (validation effectuée par le backend Snowflake)

snowflake.compatibility.enable.autogenerated.table.name.sanitization

true équivalent (caractères non valides remplacés, noms en majuscules)

false (noms de sujets utilisés tels quels pour les noms de tables, préservant la casse et les caractères spéciaux)

snowflake.compatibility.enable.column.identifier.normalization

true équivalent (noms de colonnes en majuscules)

false (les identificateurs de colonne préservent la casse)

Configurations supprimées

Les propriétés de configuration suivantes de v3 ne sont pas acceptées dans v4 :

  • snowflake.ingestion.method (v4 utilise exclusivement Snowpipe Streaming)

  • buffer.flush.time, buffer.size.bytes, buffer.count.records (géré par le SDK Snowpipe Streaming)

  • snowflake.streaming.max.client.lag (géré par le SDK)

  • snowflake.streaming.enable.single.buffer

  • snowflake.streaming.max.memory.limit.bytes

  • snowflake.streaming.closeChannelsInParallel.enabled (toujours parallèle dans v4)

  • snowflake.streaming.iceberg.enabled (auto-détection dans v4)

  • snowflake.snowpipe.* (Snowpipe non streaming non pris en charge)

  • enable.streaming.client.optimization

  • enable.streaming.channel.offset.migration (migration du format de nom du canal interne v3, inutile dans v4)

  • snowflake.streaming.channel.name.include.connector.name

  • enable.streaming.channel.offset.verification

  • snowflake.authenticator (prise en charge de l’authentification par paire de clés)

  • snowflake.oauth.* (OAuth non pris en charge dans v4)

  • provider

Suppression des convertisseurs personnalisés

Les convertisseurs personnalisés suivants fournis par Snowflake ne sont pas disponibles dans v4 :

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Utilisez plutôt des convertisseurs communautaires standard :

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

Authentification

v4 ne prend en charge que l’authentification par paire de clés. Si vous utilisez OAuth avec v3, vous devez passer à l’authentification par paire de clés avant de pouvoir effectuer la migration.

Étapes de migration

  1. Revoir les modifications importantes : Examinez les modifications importantes ci-dessus et déterminez comment elles affectent votre déploiement actuel.

  2. Vérifier les paramètres de métadonnées : Avant la migration, vérifiez que snowflake.metadata.topic et``snowflake.metadata.offset.and.partition`` sont activés dans votre connecteur v3 (ils sont activés par défaut). Cela garantit que la déduplication est possible si nécessaire.

  3. Créer une nouvelle configuration de connecteur : Créez un nouveau fichier de configuration à l’aide de la classe SnowflakeStreamingSinkConnector. Vous ne pouvez pas copier directement votre configuration v3, car v4 a des valeurs par défaut différentes pour la schématisation, la validation et la gestion des identificateurs. Voir Snowflake Connector for Kafka : installer et configurer pour la référence de configuration complète.

  4. Configurer les paramètres de compatibilité et de migration des décalages : Le connecteur v4 valide ces paramètres au démarrage. Vous devez définir explicitement les éléments suivants :

    • snowflake.enable.schematization : Défini sur true (nouveau comportement v4) ou false (comportement v3).

    • snowflake.validation : Défini sur client_side pour la compatibilité v3 ou server_side pour les valeurs par défaut v4.

    • snowflake.compatibility.enable.autogenerated.table.name.sanitization : Défini sur true pour la compatibilité v3.

    • snowflake.compatibility.enable.column.identifier.normalization : Défini sur true pour la compatibilité v3.

    • snowflake.streaming.classic.offset.migration : Défini sur skip si vous migrez depuis le mode Snowpipe, ou best_effort/strict si vous migrez depuis le mode Snowpipe Streaming.

    Pour plus d’informations, voir Validation de compatibilité.

  5. Remplacer les convertisseurs personnalisés : Si vous utilisez des convertisseurs fournis par Snowflake, remplacez-les par les équivalents communautaires énumérés ci-dessus.

  6. Suivre le chemin de migration de votre mode d’ingestion : Voir Migration à partir du mode Snowpipe ou Migration à partir du mode Snowpipe Streaming ci-dessus.

  7. Test avec des données d’échantillon : Déployez la nouvelle configuration du connecteur dans un environnement de test et vérifiez que les données circulent correctement avant de migrer les charges de travail de production.

  8. Adopter les valeurs par défaut v4 de manière incrémentielle : Une fois votre migration validée, envisagez d’adopter de manière incrémentielle les valeurs par défaut v4 (validation côté serveur, identificateurs sensibles à la casse) pour améliorer les performances et l’alignement sur les conventions de Snowflake.