Migration vom Kafka-Konnektor v3 auf v4

Unter diesem Thema wird beschrieben, wie Sie vom klassischen Kafka-Konnektor (v3 und früher) zum Snowflake Connector for Kafka (v4) migrieren.

Übersicht

Snowflake Connector for Kafka (v4) ist eine grundlegende Neufassung, die ausschließlich Snowpipe Streaming-Architektur mit hoher Performance verwendet. Sie müssen manuell eine neue Konnektorkonfiguration erstellen, um auf v4 zu migrieren.

Wichtig

Der v4-Konnektor kann nicht als Ersatz für v3 verwendet werden. Er verwendet eine andere Konnektorklasse, andere Standardverhaltensweisen und einen anderen Funktionsumfang. Prüfen Sie vor der Migration die unten aufgeführten grundlegenden Änderungen und Migrationspfade.

Preisänderungen

Der v4-Konnektor verwendet durchsatzabhängige Pauschalpreise, die auf der Menge der aufgenommenen Daten basieren (GB). Dies ist das gleiche Preismodell wie bei der:doc:Snowpipe Streaming-Architektur mit hoher Performance</user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-cost>. Um die Kosten abzuschätzen, multiplizieren Sie Ihre Datenaufnahmerate mit dem Pro-GB-Preis, der auf der Seite mit den Kosten für Snowpipe Streaming angegeben ist.

Dies ersetzt das v3-Preismodell, das auf serverlosem Computing und Dateibenachrichtigungen basierte.

Validierung der Kompatibilität

Standardmäßig aktiviert v4 eine Start-Kompatibilitätsprüfung (snowflake.streaming.validate.compatibility.with.classic=true), die verhindert, dass Sie versehentlich v4 mit einer kopierten v3-Konfiguration ausführen. Wenn diese Option aktiviert ist, überprüft der Konnektor beim Start, ob Sie die wichtigsten Migrationseinstellungen explizit konfiguriert haben. Wenn eine fehlt oder nicht kompatibel ist, schlägt der Konnektor mit einer beschreibenden Fehlermeldung fehl, die genau angibt, was eingestellt werden muss.

Der Validator prüft Folgendes:

  • snowflake.validation ist auf client_side gesetzt.

  • snowflake.compatibility.enable.column.identifier.normalization ist auf true gesetzt.

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization ist auf true gesetzt.

  • snowflake.enable.schematization ist explizit auf true oder false gesetzt (die Standardeinstellung wurde von false in v3 auf true in v4 geändert, sodass der Validator Sie auffordert, Ihre Auswahl zu bestätigen).

  • snowflake.streaming.classic.offset.migration ist explizit festgelegt.

  • snowflake.streaming.classic.offset.migration.include.connector.name ist explizit festgelegt ist (wenn die Offset-Migration strict oder best_effort ist).

Nachdem Sie die grundlegenden Änderungen überprüft und diese Einstellungen explizit konfiguriert haben, können Sie snowflake.streaming.validate.compatibility.with.classic=false einstellen, um die Überprüfung bei nachfolgenden Neustarts zu überspringen.

Eine vollständige Beschreibung dieser Eigenschaften finden Sie unter :ref:` Schematisierungs-, Validierungs- und Kompatibilitätseigenschaften<label-kafkahp_migration_properties>` und Eigenschaften der Offset-Migration.

Migrationspfade

Der Migrationspfad hängt davon ab, wie Ihr v3-Konnektor konfiguriert wurde.

Stellen Sie vor der Migration sicher, dass snowflake.metadata.topic, snowflake.metadata.offset.and.partition und snowflake.metadata.createtime in Ihrem v3-Konnektor aktiviert sind (standardmäßig aktiviert). Dies stellt sicher, dass RECORD_METADATA die Felder für das Thema, die Partition und den Offset enthält, die für die Deduplizierung benötigt werden, falls Probleme auftreten.

Migration vom v3-Snowpipe-Modus

Wenn Ihr v3-Konnektor klassisches Snowpipe verwendet (Standard snowflake.ingestion.method=SNOWPIPE), migriert v4 nahtlos unter Verwendung von Kafka-Verbrauchergruppen-Offsets.

  1. Stoppen Sie den v3-Konnektor.

  2. Warten Sie, bis alle Stagingdaten in Snowflake aufgenommen wurden. Dateien werden im klassischen Snowpipe zuerst im Stagingbereich bereitgestellt, bevor sie geladen werden, und alle Dateien, die sich noch in der Warteschlange befinden, wenn Sie den Konnektor stoppen, werden asynchron geladen. Das Starten des v4-Konnektors vor diesem Abschluss kann dazu führen, dass die Daten nicht in der richtigen Reihenfolge integriert werden.

  3. Stellen Sie die neue v4-Konfiguration mit demselben Konnektornamen wie v3 (dieselbe Kafka-Verbrauchergruppe) bereit. Stellen Sie die Offset-Migrationskonfiguration so ein, dass die SSv1-Migration übersprungen wird:

    snowflake.streaming.classic.offset.migration=skip
    
  4. Starten Sie den v4-Konnektor. Er erbt die Offsets der Kafka-Verbrauchergruppe und setzt die Datenaufnahme dort fort, wo v3 aufgehört hat.

Führen Sie den Wechsel innerhalb von offsets.retention.minutes durch (Standard 7 Tage), um das Ablaufen des Offsets zu vermeiden.

Dieser Migrationspfad führt nicht zu Duplikaten oder Lücken.

Migration vom v3 Snowpipe Streaming-Modus

Wenn Ihr v3-Konnektor Snowpipe Streaming verwendet hat (snowflake.ingestion.method=SNOWPIPE_STREAMING), kann v4 die bestätigten Offsets automatisch vom v3 Snowpipe Streaming (SSv1)-Kanälen migrieren. Dies verhindert Duplikate oder Lücken.

  1. Stoppen Sie den v3-Konnektor.

  2. Stellen Sie die neue v4-Konfiguration mit demselben Konnektornamen wie v3 bereit. Konfigurieren Sie die Einstellungen für die Offset-Migration:

    # Use 'strict' to fail if SSv1 channels aren't found, or 'best_effort' to fall
    # back to Kafka consumer group offsets if channels aren't found.
    snowflake.streaming.classic.offset.migration=best_effort
    
    # Must match your v3 setting for snowflake.streaming.channel.name.include.connector.name.
    # Set to 'true' if your v3 connector included the connector name in channel names.
    snowflake.streaming.classic.offset.migration.include.connector.name=false
    
  3. Starten Sie den v4-Konnektor. Er stellt die bestätigten Offsets von den bestehenden SSv1-Kanälen wieder her und setzt die Datenaufnahme fort, wo v3 aufgehört hat.

Führen Sie den Wechsel innerhalb von offsets.retention.minutes durch (Standard 7 Tage).

Downgrade von v4 auf v3

Ein Downgrade von v4 zurück auf v3 ist möglich, indem der Migrationsprozess rückgängig gemacht wird. Nach einem Downgrade werden jedoch doppelte Datensätze erwartet, da v3 und v4 Offsets unterschiedlich verfolgen.

So führen Sie ein Downgrade durch:

  1. Stoppen Sie den v4-Konnektor.

  2. Stellen Sie Ihre v3-Konfiguration mit demselben Konnektornamen bereit.

  3. Starten Sie den v3-Konnektor.

  4. Nach dem Downgrade können Sie Ihre Daten mit der Spalte RECORD_METADATA deduplizieren. Die folgende Abfrage entfernt doppelte Datensätze mithilfe einer Fensterfunktion für Thema, Partition und Offset:

    DELETE FROM my_table
    WHERE RECORD_METADATA IS NOT NULL
      AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
          IN (
            SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
            FROM (
              SELECT RECORD_METADATA,
                     ROW_NUMBER() OVER (
                       PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                       ORDER BY RECORD_METADATA:offset
                     ) AS rn
              FROM my_table
              WHERE RECORD_METADATA IS NOT NULL
            )
            WHERE rn > 1
          );
    

Wichtig

Die Deduplizierung erfordert, dass RECORD_METADATA Themen-, Partitions- und Offsetfelder enthält. Stellen Sie sicher, dass die snowflake.metadata.topic- und snowflake.metadata.offset.and.partition-Einstellungen vor der Migration auf v4 aktiviert sind.

Wenn Sie während des Downgrades auf Probleme stoßen, wenden Sie sich an den ` Snowflake-Support`_.

Grundlegende Änderungen

Neue Konnektorklasse

Änderung

v3

v4

Konnektorklasse

com.snowflake.kafka.connector.SnowflakeSinkConnector

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

Datenaufnahmemethoden

Snowpipe (Batch) oder Snowpipe Streaming (optional)

Nur Snowpipe Streaming

Java-Version

Java 8+

Java 11+

Standardverhalten geändert

Konfiguration

v3-Standard

v4-Standard

snowflake.enable.schematization

false (Datensätze, die in den Spalten RECORD_CONTENT und RECORD_METADATA VARIANT gespeichert sind)

true (Datensatzfelder, die einzelnen Tabellenspalten zugeordnet sind)

snowflake.validation

Clientseitiges Äquivalent

server_side (Validierung durch Snowflake Backend)

snowflake.compatibility.enable.autogenerated.table.name.sanitization

true-Äquivalent (ungültige Zeichen werden ersetzt, Namen in Großbuchstaben geschrieben)

false (Themennamen werden unverändert für Tabellennamen verwendet, wobei Groß-/Kleinschreibung und Sonderzeichen beibehalten werden)

snowflake.compatibility.enable.column.identifier.normalization

true-Äquivalent (Spaltennamen in Großbuchstaben)

false (Spaltenbezeichner behalten Groß-/Kleinschreibung bei)

Entfernte Konfigurationen

Die folgenden Konfigurationseigenschaften von v3 werden in v4 nicht akzeptiert:

  • snowflake.ingestion.method (v4 verwendet ausschließlich Snowpipe Streaming)

  • buffer.flush.time, buffer.size.bytes, buffer.count.records (verwaltet vom Snowpipe Streaming SDK)

  • snowflake.streaming.max.client.lag (verwaltet vom SDK)

  • snowflake.streaming.enable.single.buffer

  • snowflake.streaming.max.memory.limit.bytes

  • snowflake.streaming.closeChannelsInParallel.enabled (immer parallel in v4)

  • snowflake.streaming.iceberg.enabled (in v4 automatisch erkannt)

  • snowflake.snowpipe.* (Snowpipe ohne Streaming wird nicht unterstützt)

  • enable.streaming.client.optimization

  • enable.streaming.channel.offset.migration (Migration des internen Kanalnamenformats von v3, nicht erforderlich in v4)

  • snowflake.streaming.channel.name.include.connector.name

  • enable.streaming.channel.offset.verification

  • snowflake.authenticator (nur Schlüsselpaar-Authentifizierung unterstützt)

  • snowflake.oauth.* (OAuth wird in v4 nicht unterstützt)

  • provider

Kundenspezifische Konverter wurden entfernt

Die folgenden von Snowflake bereitgestellten benutzerdefinierten Konverter sind in v4 nicht verfügbar:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Verwenden Sie stattdessen Standard-Community-Konverter:

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

Authentifizierung

v4 unterstützt nur die Authentifizierung mit Schlüsselpaaren. Wenn Sie OAuth mit v3 verwenden, müssen Sie vor der Migration zur Authentifizierung mit einem Schlüsselpaar wechseln.

Schritte zur Migration

  1. Grundlegende Änderungen überprüfen: Überprüfen Sie die oben genannten Änderungen und stellen Sie fest, wie sich diese auf Ihre aktuelle Bereitstellung auswirken.

  2. Metadateneinstellungen überprüfen: Bestätigen Sie vor der Migration, dass snowflake.metadata.topic und snowflake.metadata.offset.and.partition in Ihrem v3-Konnektor aktiviert sind (standardmäßig aktiviert). Dadurch wird sichergestellt, dass eine Deduplizierung bei Bedarf möglich ist.

  3. Neue Konnektorkonfiguration erstellen: Erstellen Sie eine neue Konfigurationsdatei unter Verwendung der SnowflakeStreamingSinkConnector-Klasse. Sie können Ihre v3-Konfiguration nicht direkt kopieren, da v4 andere Standardeinstellungen für Schematisierung, Validierung und die Behandlung von Bezeichnern hat. Siehe Snowflake Connector for Kafka: Installation und Konfiguration für die vollständige Konfigurationsreferenz.

  4. Kompatibilität und Offset-Migrationseinstellungen konfigurieren: Der v4-Konnektor validiert diese Einstellungen beim Start. Sie müssen explizit Folgendes festlegen:

    • snowflake.enable.schematization: Setzen Sie dies auf true (neues v4-Verhalten) oder false (v3-Verhalten).

    • snowflake.validation: Setzen Sie dies auf client_side für v3-Kompatibilität oder server_side für v4-Standardwerte.

    • snowflake.compatibility.enable.autogenerated.table.name.sanitization: Setzen Sie dies auf true für v3-Kompatibilität.

    • snowflake.compatibility.enable.column.identifier.normalization: Setzen Sie dies auf true für v3-Kompatibilität.

    • snowflake.streaming.classic.offset.migration: Setzen Sie dies auf skip bei Migration vom Snowpipe-Modus oder auf best_effort/strict bei Migration vom Snowpipe Streaming-Modus.

    Weitere Informationen dazu finden Sie unter Validierung der Kompatibilität.

  5. Kundenspezifische Konverter ersetzen: Wenn Sie von Snowflake bereitgestellte Konverter verwenden, ersetzen Sie diese durch die oben aufgeführten Community-Äquivalente.

  6. Folgen Sie dem Migrationspfad für Ihren Datenaufnahmemodus: Siehe Migration vom Snowpipe-Modus oder Migration vom Snowpipe Streaming-Modus oben.

  7. Test mit Beispieldaten: Stellen Sie die neue Konnektorkonfiguration in einer Testumgebung bereit, und überprüfen Sie, ob die Daten korrekt übergeben werden, bevor Sie die Produktions-Workloads migrieren.

  8. V4-Standardwerte inkrementell übernehmen: Sobald Ihre Migration validiert ist, sollten Sie in Erwägung ziehen, schrittweise die v4-Standardeinstellungen zu übernehmen (serverseitige Validierung, Groß-/Kleinschreibung bei Bezeichnern), um die Leistung zu verbessern und die Übereinstimmung mit den Snowflake-Konventionen sicherzustellen.