Snowflake Connector for Kafka: Installation und Konfiguration

Unter diesem Thema werden die Schritte zur Installation und Konfiguration von Snowflake Connector for Kafka beschrieben.

Installieren des Kafka-Konnektors

Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.

Snowflake bietet zwei Versionen des Konnektors:

Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.

Installationsvoraussetzungen

  • Der Kafka-Konnektor unterstützt die folgenden Paketversionen:

    Paket

    Version des Snowflake-Kafka-Konnektors

    Paketunterstützung (von Snowflake getestet)

    Apache Kafka

    2.0.0 (oder höher)

    Apache Kafka 2.8.2, 3.7.2, 4.1.1

    Confluent

    2.0.0 (oder höher)

    Confluent 6.2.15, 7.8.2, 8.2.0

  • Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 3.9.0 ausgelegt. Spätere Versionen der Kafka Connect-API wurden nicht getestet. Versionen vor 3.9.0 sind mit dem Konnektor kompatibel. Weitere Informationen dazu finden Sie unter Kafka-Kompatibilität.

  • Wenn Sie sowohl den Kafka-Konnektor als auch die JAR-Dateien des JDBC-Treibers in Ihrer Umgebung haben, stellen Sie sicher, dass Ihre JDBC-Version mit der snowflake-jdbc-Version übereinstimmt, die in der Datei pom.xml Ihrer vorgesehenen Kafka-Konnektor-Version angegeben ist. Sie können zu Ihrer bevorzugten Release-Version des Kafka-Konnektors wechseln, z. B. v4.0.0. Durchsuchen Sie dann die Datei pom.xml, um die Version von snowflake-jdbc zu ermitteln.

  • Wenn Sie das Avro-Format zur Aufnahme von Daten verwenden, gilt Folgendes:

  • Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.

  • Installieren und konfigurieren Sie den Kafka Connect-Cluster.

    Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)

    Wichtig

    Der v4-Konnektor verwendet ein auf Rust basierendes Snowpipe Streaming-SDK, das Off-Heap-Speicher (Systemspeicher) für die Pufferung zuweist. Begrenzen Sie die JVM-Heap-Größe auf etwa 50 % des verfügbaren Speichers, um Platz für das SDK zu lassen. Beispiel: Für einen Worker mit 8 GB RAM legen Sie -Xmx4g fest.

  • Snowflake empfiehlt, für Kafka-Broker und die Kafka Connect-Laufzeitumgebung die gleiche Version zu verwenden.

  • Snowflake empfiehlt dringend, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.

Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.

Installieren des Konnektors

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent. Die folgende Tabelle beschreibt die unterstützten Konnektorversionen.

Release-Serie

Status

Anmerkungen

4.x.x

Allgemein verfügbar

Neueste Version. Die Migration von 3.x und 2.x muss manuell erfolgen.

3.x.x

Offiziell unterstützt

Upgrade auf v4 empfohlen.

2.x.x

Offiziell unterstützt

Upgrade empfohlen.

1.x.x

Nicht unterstützt

Installieren des Konnektors für Confluent

Kafka-Konnektordateien herunterladen

Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:

Confluent Hub:

https://www.confluent.io/hub/

Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaar-Authentifizierung erforderlich sind. Weitere Informationen dazu finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema).

Zentrales Maven-Repository:

https://mvnrepository.com/artifact/com.snowflake

Wenn Sie diese Version verwenden, müssen Sie die Bouncy Castle-Kryptographie-Bibliotheken (JAR-Dateien) herunterladen:

Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:

Installieren des Konnektors für Open Source Apache Kafka

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.

Apache Kafka installieren

  1. Laden Sie das Kafka-Paket von der offiziellen Kafka-Website herunter.

  2. Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.

  3. Führen Sie den folgenden Befehl aus, um die Datei kafka_<scala_version>-<kafka_version>.tgz zu dekomprimieren:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

JDK installieren

Installieren und konfigurieren Sie das Java Development Kit (JDK) Version 11 oder höher. Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.

Wenn Sie bereits zuvor das JDK installiert haben, können Sie diesen Abschnitt überspringen.

  1. Laden Sie das JDK von der Oracle JDK-Website herunter.

  2. Installieren oder dekomprimieren Sie das JDK.

  3. Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.

JAR-Dateien des Kafka-Konnektors herunterladen

  1. Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:

    https://mvnrepository.com/artifact/com.snowflake

  2. Laden Sie die JAR-Dateien für die Bouncy Castle-Kryptographie-Bibliothek herunter:

  3. Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei (1.11.4) herunter:

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Kopieren Sie die JAR-Dateien, die Sie in Installieren des Konnektors für Open Source Apache Kafka heruntergeladen haben, in den Ordner <kafka_dir>/libs.

Konfigurieren des Kafka-Konnektors

Im Standalone-Modus wird der Konnektor konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt. Bei Bereitstellung im verteilten Modus wird der Konnektor durch den Aufruf des REST API-Endpunkts des Kafka Connect-Clusters konfiguriert.

Wichtig

Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom primären Knoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Weitere Informationen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.

Jede Konfiguration legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.

Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.

Beschreibungen der Konfigurationsfelder finden Sie unter Konnektor-Konfigurationseigenschaften.

Wichtig

Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.

Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern. Weitere Informationen dazu finden Sie unter Externalisieren von Geheimnissen (unter diesem Thema).

Verteilter Modus

Erstellen Sie die Kafka-Konfigurationsdatei, z. B. <path>/<config_file>.json. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration. Die Datei muss im JSON-Format vorliegen.

Beispielhafte Konfigurationsdatei

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none",
      "snowflake.streaming.validate.compatibility.with.classic": "false"
      }
}

Eigenständiger Modus

Erstellen Sie eine Konfigurationsdatei, z. B. <kafka_dir>/config/SF_connect.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.

Beispielhafte Konfigurationsdatei

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
snowflake.streaming.validate.compatibility.with.classic=false

Cache-Hinweise für Testen und Prototyping

Der Konnektor speichert Überprüfungen auf das Vorhandensein von Tabellen und Pipes im Cache, um die Performance beim Rebalancing von Partitionen zu verbessern. Beim Testen und beim Prototyping kann dieses Caching-Verhalten jedoch dazu führen, dass der Konnektor manuell erstellte Tabellen oder Pipes nicht sofort erkennt.

Problem: Wenn Sie eine Tabelle oder Pipe manuell erstellen, während der Konnektor ausgeführt wird, kann der Konnektor standardmäßig bis zu 5 Minuten lang zwischengespeicherte Ergebnisse der Existenzprüfung verwenden (die darauf hinweisen können, dass das Objekt nicht existiert). Dies kann zu unerwarteten Fehlern oder Verhaltensweisen beim Testen führen.

Empfehlung zum Testen: Um Cache-bezogene Probleme beim Testen und Prototyping zu vermeiden, deaktivieren Sie das Caching:

snowflake.cache.table.exists=false
snowflake.cache.pipe.exists=false

Diese Konfiguration stellt sicher, dass der Konnektor bei jedem Neuausgleichen einer Partition neue Existenzprüfungen durchführt, sodass Sie die Auswirkungen von manuell erstellten Tabellen und Pipes sofort sehen können.

Wichtig

Diese minimalen Cache-Einstellungen werden nur für Tests und Prototyping empfohlen. Verwenden Sie in Produktionsumgebungen die Standardwerte für den Cache-Ablauf (5 Minuten oder mehr), um Metadaten-Abfragen an Snowflake zu minimieren und die Performance beim Neuausgleichen zu verbessern, insbesondere wenn Sie viele Partitionen verarbeiten.

Konnektor-Konfigurationseigenschaften

Minimalkonfiguration für neue Installationen

Im Folgenden finden Sie eine Minimalkonfiguration, um den Konnektor mit v4-Standardeinstellungen auszuführen. Dieses Beispiel verwendet das JSON-Format für den verteilten Modus:

{
  "name": "my_kafka_connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "my_topic",
    "snowflake.url.name": "https://myaccount.snowflakecomputing.com",
    "snowflake.user.name": "my_user",
    "snowflake.private.key": "<base64-encoded-private-key>",
    "snowflake.database.name": "MY_DB",
    "snowflake.schema.name": "MY_SCHEMA",
    "snowflake.role.name": "MY_ROLE",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

Diese Konfiguration verwendet alle v4-Standardeinstellungen: serverseitige Validierung, schematisierte Spalten und Bezeichner mit Unterscheidung zwischen Groß-/Kleinschreibung. Der Konnektor erstellt bei Bedarf automatisch Tabellen und Pipes.

Bemerkung

Legen Sie snowflake.streaming.validate.compatibility.with.classic für neue Installationen auf false fest. Diese Einstellung wird nur benötigt, wenn Sie von v3 migrieren.

Erforderliche Eigenschaften

name

Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter topic2table.map (unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

Bemerkung

Entweder topics oder topics.regex ist erforderlich, nicht beide.

topics.regex

Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder topics oder topics.regex enthalten, nicht beides.

snowflake.url.name

Die URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (https://) und die Portnummer optional sind.

snowflake.user.name

Anmeldename des Benutzers für das Snowflake-Konto.

snowflake.role.name

Der Name der Rolle, mit der der Konnektor Daten in die Tabelle einfügt.

snowflake.private.key

Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwert snowflake.private.key verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation verschlüsselt wurden.

Bemerkung

Siehe auch snowflake.private.key.passphrase in Optionale Eigenschaften.

snowflake.database.name

Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.

snowflake.schema.name

Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.

header.converter

Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist "org.apache.kafka.connect.storage.StringConverter".

key.converter

Schlüsselkonverter des Kafka-Datensatzes (z. B. "org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.

value.converter

Der Konnektor unterstützt die Standard-Konverter der Kafka-Community. Wählen Sie den passenden Konverter auf der Grundlage Ihres Datenformats aus:

  • Für JSON-Datensätze: "org.apache.kafka.connect.json.JsonConverter"

  • Für Avro-Datensätze mit Schema-Registry: "io.confluent.connect.avro.AvroConverter"

Bemerkung

Wenn snowflake.enable.schematization=true (der Standardwert), werden StringConverter und ByteArrayConverter nicht als Wertkonverter unterstützt. Weitere Informationen dazu finden Sie unter Problembehandlung eines Snowflake Connector for Kafka.

Optionale Eigenschaften

Schematisierungs- und Validierungseigenschaften

Diese Eigenschaften steuern, wie der Konnektor Daten verarbeitet und validiert. Für neue Installationen funktionieren die Standardwerte gut. Wenn Sie von v3 migrieren, lesen Sie Migration vom Kafka-Konnektor v3 auf v4, um Hinweise zu erhalten, welche Werte verwendet werden sollten.

snowflake.enable.schematization

Steuert, ob eingehende Datensätze in einzelne Tabellenspalten schematisiert oder in Legacy-VARIANT-Spalten eingeschlossen werden.

Wenn true (Standard), werden Datensatzfelder einzelnen Tabellenspalten über Namen zugeordnet. Wenn false, speichert der Konnektor Datensätze in zwei VARIANT-Spalten (RECORD_CONTENT und RECORD_METADATA), was dem Verhalten von v3 entspricht.

Standard:

true

snowflake.validation

Steuert, wo Datenvalidierung und Schemaentwicklung durchgeführt werden.

server_side (Standard): Die Validierung wird vom Snowflake-Backend ausgeführt, was dem COPY- und Snowpipe-Verhalten entspricht. Ungültige Datensätze werden in Fehlertabellen erfasst. Unterstützt sowohl den Standard-Pipe- als auch den benutzerdefinierten Pipe-Modus.

client_side: Der Konnektor überprüft Datentypen und Schemakompatibilität, bevor Zeilen an Snowflake gesendet werden. Unterstützt eine Warteschlange für nicht zustellbare Meldungen (DLQ) für ungültige Datensätze. Funktioniert nur mit dem Standard-Pipe-Modus.

Weitere Details dazu finden Sie unter Validierung und Fehlerbehandlung.

Standard:

server_side

Migrations- und Kompatibilitätseigenschaften

Diese Eigenschaften sind relevant bei der Migration von v3. Bei Neuinstallationen können Sie diese überspringen und snowflake.streaming.validate.compatibility.with.classic=false einstellen.

snowflake.compatibility.enable.autogenerated.table.name.sanitization

Steuert, wie automatisch generierte Tabellennamen von Themennamen abgeleitet werden.

Wenn false (die Standardeinstellung), werden Themennamen unverändert für Tabellennamen verwendet, wobei Groß- und Kleinschreibung und Sonderzeichen beibehalten werden. Tabellennamen werden als Bezeichner in Anführungszeichen erstellt.

Wenn``true``, werden ungültige Zeichen in Snowflake-Bezeichnern durch Unterstriche ersetzt, Namen werden in Großbuchstaben geschrieben und ein Hash-Code wird zur Eindeutigkeit angehängt. Dies entspricht dem Verhalten von v3.

Standard:

false

snowflake.compatibility.enable.column.identifier.normalization

Steuert, wie Spaltenbezeichner behandelt werden.

Wenn false (die Standardeinstellung), behalten Spaltenbezeichner die Groß-/Kleinschreibung und Sonderzeichen unverändert bei.

Wenn true, werden Spaltenbezeichner auf Großbuchstaben normalisiert, was dem Verhalten von v3 entspricht.

Standard:

false

snowflake.streaming.validate.compatibility.with.classic

Aktiviert eine Startvalidierung, die prüft, ob alle migrationsbezogenen Konfigurationen explizit festgelegt sind. Wenn true, schlägt der Konnektor beim Start mit einem beschreibenden Fehler fehl, wenn eine der folgenden Konfigurationen fehlt oder mit dem Verhalten von v3 nicht kompatibel ist:

  • snowflake.validation muss client_side sein.

  • snowflake.compatibility.enable.column.identifier.normalization muss true sein.

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization muss true sein.

  • snowflake.enable.schematization muss explizit auf true oder false gesetzt sein (der Standard wurde zwischen v3 und v4 geändert).

  • snowflake.streaming.classic.offset.migration muss explizit festgelegt sein.

  • snowflake.streaming.classic.offset.migration.include.connector.name muss explizit festgelegt sein (wenn die Offset-Migration strict oder best_effort ist).

Dies verhindert, dass versehentlich v4 mit einer kopierten v3-Konfiguration ausgeführt wird, ohne die grundlegenden Änderungen zu überprüfen. Legen Sie false fest, um diese Prüfung zu überspringen, nachdem Sie Ihre Konfiguration bestätigt haben.

Bemerkung

Bei Neuinstallationen (keine Migration von v3) setzen Sie den Wert auf false. Der Kompatibilitätsvalidierer wird nur benötigt, wenn ein Upgrade von einer bestehenden v3-Bereitstellung ausgeführt wird.

Weitere Informationen dazu finden Sie unter Migration vom Kafka-Konnektor v3 auf v4.

Standard:

true

Eigenschaften der Offset-Migration

Diese Eigenschaften steuern, wie v4 bestätigte Offsets von v3 Snowpipe Streaming (SSv1)-Kanälen migriert. Diese sind nur relevant, wenn Sie von einem v3-Konnektor migrieren, der snowflake.ingestion.method=SNOWPIPE_STREAMING verwendet hat. Wenn Sie vom v3-Snowpipe-Modus (dateibasierte Datenaufnahme) migrieren, setzen Sie snowflake.streaming.classic.offset.migration auf skip.

snowflake.streaming.classic.offset.migration

Steuert, wie v4 Offsets von v3 Snowpipe Streaming (SSv1)-Kanälen migriert.

strict: v4 sucht den bestätigten Offset des v3 SSv1-Kanals und wird ab diesem Punkt fortgesetzt. Wenn der SSv1-Kanal nicht gefunden wird, schlägt der Konnektor mit einem Fehler fehl.

best_effort: v4 versucht, den bestätigten Offset des v3 SSv1-Kanals zu ermitteln. Wenn der Kanal nicht gefunden wird, greift v4 auf den Offset der Kafka-Verbrauchergruppe zurück.

skip (Standard): Es wird keine SSv1-Offset-Migration durchgeführt. v4 verwendet den Offset der Kafka-Verbrauchergruppe. Verwenden Sie dies, wenn Sie vom v3 Snowpipe-Modus (nicht Snowpipe Streaming) migrieren.

Standard:

skip

snowflake.streaming.classic.offset.migration.include.connector.name

Steuert, ob die SSv1-Kanalnamensuche den Namen des Konnektors enthält. Dieser muss mit der Konfiguration Ihres v3-Konnektors übereinstimmen. In v3 steuerte die Eigenschaft snowflake.streaming.channel.name.include.connector.name, ob der Name des Konnektors im Kanalnamen enthalten ist.

Auf true setzen, wenn Ihr v3-Konnektor snowflake.streaming.channel.name.include.connector.name=true aufwies oder wenn Sie den Kafka-Konnektor der Version 2.1.0 oder 2.1.1 ausführten (diese Versionen enthielten standardmäßig den Namen des Konnektors). Andernfalls auf false setzen.

Nur erforderlich, wenn snowflake.streaming.classic.offset.migration auf strict oder best_effort gesetzt ist.

Standard:

Keine (muss explizit festgelegt werden, wenn die Offset-Migration aktiv ist)

Fehlerbehandlung von Eigenschaften

errors.tolerance

Steuert, wie der Konnektor auf Fehler während der Erfassung reagiert.

none (Standard): Die Konnektor-Aufgabe schlägt beim ersten Fehler fehl. Bei der serverseitigen Validierung ist die Fehlererkennung asynchron, sodass einige Datensätze nach dem fehlerhaften noch erfasst werden können, bevor die Aufgabe fehlschlägt.

all: Der Konnektor nimmt weiterhin Daten auf. Bei der clientseitigen Validierung werden ungültige Datensätze an die DLQ weitergeleitet (falls konfiguriert) oder ohne Meldung gelöscht.

Warnung

Die Einstellung von errors.tolerance=all ohne Konfiguration eines DLQ-Themas führt dazu, dass ungültige Datensätze ohne Meldung gelöscht werden, wenn die clientseitige Validierung verwendet wird. Dies kann zu Datenverlusten führen.

Standard:

none

errors.deadletterqueue.topic.name

Der Name des Kafka-Themas für die Warteschlange für nicht zustellbare Meldungen (DLQ). Nur wirksam, wenn snowflake.validation=client_side und errors.tolerance=all.

Standard:

leer (DLQ deaktiviert)

errors.log.enable

Wenn true, werden Fehler mit Details zur fehlgeschlagenen Operation und den Datensatzeigenschaften protokolliert.

Standard:

false

enable.task.fail.on.authorization.errors

Wenn true, schlägt die Konnektoraufgabe bei Autorisierungsfehlern von Snowflake sofort fehl. Wenn false, versucht der Konnektor es erneut.

Standard:

false

Zwischenspeichern von Eigenschaften

snowflake.cache.table.exists

Aktiviert das Caching für das Vorhandensein von Tabellen, wodurch die Anzahl der Metadaten-Abfragen an Snowflake reduziert wird.

Standard:

true

snowflake.cache.table.exists.expire.ms

Cache-Ablaufzeit in Millisekunden für die Überprüfung auf das Vorhandensein von Tabellen.

Standard:

300000 (5 Minuten)

snowflake.cache.pipe.exists

Aktiviert das Caching für die Überprüfung auf das Vorhandensein von Pipes.

Standard:

true

snowflake.cache.pipe.exists.expire.ms

Cache-Ablaufzeit in Millisekunden für die Überprüfung auf das Vorhandensein von Pipes.

Standard:

300000 (5 Minuten)

Überwachungs- und Diagnoseeigenschaften

jmx

Aktiviert JMX MBeans für Konnektor-Metriken. Weitere Informationen dazu finden Sie unter Monitor the Snowflake Connector for Kafka.

Standard:

true

enable.mdc.logging

Aktiviert MDC (Mapped Diagnostic Kontext), um den Konnektorkontext den Protokollmeldungen voranzustellen, was nützlich ist, wenn mehrere Konnektor-Instanzen ausgeführt werden.

Standard:

false

snowflake.streaming.metadata.connectorPushTime

Wenn true, ist der SnowflakeConnectorPushTime-Zeitstempel in RECORD_METADATA enthalten. Dieses Feld erfasst, wann der Konnektor einen Datensatz zur Erfassung gepuffert hat, und ist nützlich, um die End-to-End-Latenz abzuschätzen.

Standard:

true

Erweiterte Eigenschaften

snowflake.streaming.client.provider.override.map

Überschreibt die Eigenschaften von Snowpipe Streaming-Clients. Format: key1:value1,key2:value2. Verwenden Sie diese Option nur nach Rücksprache mit dem Snowflake-Support.

Standard:

empty

Andere Eigenschaften

snowflake.private.key.passphrase

Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.

tasks.max

Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Um eine optimale Leistung zu erzielen, empfiehlt Snowflake, die Anzahl der Tasks auf die Gesamtzahl der Kafka-Partitionen festzulegen, jedoch nicht die Anzahl der CPU-Kerne zu überschreiten. Eine hohe Anzahl von Tasks kann zu einem erhöhten Arbeitsspeicherverbrauch und häufigen Anpassungen führen.

snowflake.topic2table.map

Durch Kommas getrennte Liste der Zuordnungen von Themen zu Tabellen im topic:table-Format. Unterstützt Regex-Muster für Themennamen. Die regulären Ausdrücke dürfen nicht mehrdeutig sein – jedes abgeglichene Thema darf nur mit einer einzigen Zieltabelle übereinstimmen.

Sowohl Themen- als auch Tabellennamen können in doppelte Anführungszeichen gesetzt werden, um Sonderzeichen (Doppelpunkte, Kommas, Leerzeichen) zu unterstützen. Tabellennamen, die nicht in Anführungszeichen gesetzt sind, werden in Großbuchstaben geschrieben. Bei Tabellennamen in Anführungszeichen wird die Groß-/Kleinschreibung beibehalten.

Detaillierte Beispiele, einschließlich Regex-Muster, Viele-zu-eins-Zuordnungen und Anführungszeichen, finden Sie unter :ref:` Explizite Zuordnung von Themen zu Tabellen<label-kafkahp_explicit_topic_to_table_mapping>`.

Beispiel:

snowflake.topic2table.map=topic1:low_range,topic2:low_range,"my:topic":"My_Table"
value.converter.schema.registry.url

Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.

value.converter.break.on.schema.registry.error

Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist false. Setzen Sie den Wert auf true, um dieses Verhalten zu aktivieren.

jvm.proxy.host

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.

jvm.proxy.port

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.

jvm.proxy.username

Benutzername, der sich beim Proxyserver authentifiziert.

jvm.proxy.password

Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.

snowflake.jdbc.map

Beispiel: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

Weitere JDBC-Eigenschaften (siehe Übersicht der Verbindungsparameter für den JDBC-Treiber) werden nicht validiert. Diese zusätzlichen Eigenschaften werden nicht validiert und dürfen weder überschrieben noch anstelle der erforderlichen Eigenschaften verwendet werden, wie z. B: jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name und ähnliche Eigenschaften.

Geben Sie eine der folgenden Kombinationen an:
  • tracing-Eigenschaft zusammen mit der Variable JDBC_TRACE env

  • database-Eigenschaft zusammen mit snowflake.database.name

Führt zu einem mehrdeutigen Verhalten und das Verhalten wird durch den JDBC-Treiber bestimmt.

value.converter.basic.auth.credentials.source

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter value.converter.basic.auth.user.info an. Andernfalls lassen Sie diesen Parameter aus.

value.converter.basic.auth.user.info

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.

snowflake.metadata.createtime

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert CreateTime in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.topic

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert topic in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.offset.and.partition

Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte Offset und Partition in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.all

Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.

transforms

Geben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.

Setzen Sie den Eigenschaftswert auf "tombstoneHandlerExample".

Bemerkung

Verwenden Sie diese Eigenschaft nur mit den Kafka Community-Konvertern (d. h. value.converter-Eigenschaftswert) (z. B. org.apache.kafka.connect.json.JsonConverter oder org.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaft behavior.on.null.values.

transforms.tombstoneHandlerExample.type

Erforderlich beim Einstellen der transforms-Eigenschaft.

Setzen Sie den Eigenschaftswert auf "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Geben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.

Diese Eigenschaft unterstützt die folgenden Werte:

DEFAULT

Wenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.

IGNORE

Der Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.

Der Standardwert ist DEFAULT.

Bemerkung

Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:

  • Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften transform und transforms.tombstoneHandlerExample.type.

  • Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.

An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.

Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation

Der Kafka-Konnektor basiert auf der Schlüsselpaar-Authentifizierung anstelle der Authentifizierung mit Benutzername und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.

Nachdem Sie die Aufgaben zur Schlüsselpaar-Authentifizierung auf dieser Seite und die Aufgaben für die Schlüsselpaar-Rotation abgeschlossen haben, prüfen Sie die Empfehlung für Externalisieren von Geheimnissen (unter diesem Thema).

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Bemerkung

    Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (d. h. FIPS 140-2) zu erfüllen. Weitere Informationen dazu finden Sie unter FIPS 140-2.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa -out rsa_key.pem 2048
    

    Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    Wobei <Algorithmus> ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.

    So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft snowflake.private.key.passphrase an.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei rsa_key.p8, dann verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Notieren Sie den Pfad zu den Dateien. Der private Schlüssel wird im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt. Die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in der Verantwortung der Benutzenden, die Datei zu sichern, wenn sie nicht verwendet wird.

  4. Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu.

    Beispiel:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Bemerkung

    Die Eigenschaft RSA_PUBLIC_KEY_2_FP wird unter Konfigurieren der Schlüsselpaar-Rotation beschrieben.

  5. Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld snowflake.private.key der Konfigurationsdatei ein. Speichern Sie die Datei.

Externalisieren von Geheimnissen

Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.

Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes.

Starten des Konnektors

Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters. Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:

Verteilter Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors

Eigenständiger Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties

Bemerkung

Bei einer Standardinstallation von Apache Kafka oder Confluent Kafka sollte die Datei connect-standalone.properties bereits enthalten sein.

Nächste Schritte

Konnektor testen.