Snowflake High Performance connector for Kafka: Install and configure

This topic describes the steps to install and configure the Snowflake High Performance connector for Kafka.

Installieren des Kafka-Konnektors

Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.

Snowflake bietet zwei Versionen des Konnektors:

Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.

Installationsvoraussetzungen

  • The Kafka connector supports the following package versions:

    Paket

    Snowflake Kafka Connector Version

    Paketunterstützung (von Snowflake getestet)

    Apache Kafka

    2.0.0 (oder höher)

    Apache Kafka 2.8.2, 3.7.2

    Confluent

    2.0.0 (oder höher)

    Confluent 6.2.15, 7.8.2

  • Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 3.9.0 ausgelegt. Spätere Versionen der Kafka Connect-API wurden nicht getestet. Versionen vor 3.9.0 sind mit dem Konnektor kompatibel. Weitere Informationen dazu finden Sie unter Kafka-Kompatibilität.

  • Wenn Sie sowohl den Kafka-Konnektor als auch die JAR-Dateien des JDBC-Treibers in Ihrer Umgebung haben, stellen Sie sicher, dass Ihre JDBC-Version mit der snowflake-jdbc-Version übereinstimmt, die in der Datei pom.xml Ihrer vorgesehenen Kafka-Konnektor-Version angegeben ist. Sie können zu Ihrer bevorzugten Release-Version des Kafka-Konnektors wechseln, z. B. v4.0.0-rc4. Durchsuchen Sie dann die Datei pom.xml, um die Version von snowflake-jdbc zu ermitteln.

  • Wenn Sie das Avro-Format zur Aufnahme von Daten verwenden, gilt Folgendes:

  • Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.

  • Install and configure the Kafka Connect cluster.

    Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)

  • Wir empfehlen Ihnen, für Kafka-Broker und die Kafka Connect-Laufzeitumgebung die gleiche Version zu verwenden.

  • Es wird dringend empfohlen, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.

Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.

Installing the connector

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent. Die folgende Tabelle beschreibt die unterstützten Versionen und Informationen zu Pre-Releases und Release-Kandidaten.

Release-Serie

Status

Anmerkungen

4.x.x

Öffentliche Vorschau

Frühzeitiger Zugriff. Unterstützt Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview Derzeit muss die Migration von den Versionen 3.x auf die Version 2.x manuell erfolgen. Sie kann nicht als Ersatz für frühere Versionen verwendet werden. Die Features unterscheiden sich von den Versionen 3.x, 2.x, 1.x

3.x.x

Offiziell unterstützt

Keine Unterstützung für Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview.

2.x.x

Offiziell unterstützt

Upgrade empfohlen. Keine Unterstützung für Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview.

1.x.x

Nicht unterstützt

Verwenden Sie diese Release-Serie nicht.

Installing the connector for Confluent

Download the Kafka connector files

Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:

Confluent Hub:

https://www.confluent.io/hub/

Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaar-Authentifizierung erforderlich sind. Weitere Informationen dazu finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema).

Zentrales Maven-Repository:

https://mvnrepository.com/artifact/com.snowflake

Wenn Sie diese Version verwenden, müssen Sie die Bouncy Castle-Kryptographie-Bibliotheken (z. B JAR-Dateien) herunterladen:

Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Install the Kafka connector

Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:

Installieren des Konnektors für Open Source Apache Kafka

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.

Install Apache Kafka

  1. Laden Sie das Kafka-Paket von der offiziellen Kafka-Website herunter.

  2. Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.

  3. Führen Sie den folgenden Befehl aus, um die Datei kafka_<Scala-Version>-<Kafka-Version>.tgz zu dekomprimieren:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

JDK installieren

Installieren und konfigurieren Sie das Java Development Kit (JDK) Version 11 oder höher. Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.

Wenn Sie bereits zuvor das JDK installiert haben, können Sie diesen Abschnitt überspringen.

  1. Laden Sie das JDK von der Oracle JDK-Website herunter.

  2. Installieren oder dekomprimieren Sie das JDK.

  3. Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.

Download the Kafka connector JAR files

  1. Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:

    https://mvnrepository.com/artifact/com.snowflake

  2. Laden Sie die JAR-Dateien für die Bouncy Castle-Kryptographie-Bibliothek herunter:

  3. Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei (1.11.4) herunter:

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Install the Kafka connector

Kopieren Sie die JAR-Dateien, die Sie in Installieren des Konnektors für Open Source Apache Kafka heruntergeladen haben, in den Ordner <kafka_dir>/libs.

Configuring the Kafka connector

Im Standalone-Modus wird der Konnektor konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt. Bei Bereitstellung im verteilten Modus wird der Konnektor durch den Aufruf des REST API-Endpunkts des Kafka Connect-Clusters konfiguriert.

Wichtig

The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. Configuration settings include sensitive information, specifically, the Snowflake username and private key. Make sure to secure the communication channel between Kafka Connect nodes. For more information, see the documentation for your Apache Kafka software.

Each configuration specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be stored in a single database and schema.

Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.

Beschreibungen der Konfigurationsfelder finden Sie unter Konnektor-Konfigurationseigenschaften.

Wichtig

Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.

In addition, consider storing the configuration file in a secure external location or a key management service. For more information, see Externalizing Secrets (in this topic).

Verteilter Modus

Erstellen Sie die Kafka-Konfigurationsdatei, z. B. <path>/<config_file>.json. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration. Die Datei muss im JSON-Format vorliegen.

Sample configuration file

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none"
      }
}
Copy

Eigenständiger Modus

Erstellen Sie eine Konfigurationsdatei, z. B. <kafka_dir>/config/SF_connect.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.

Sample configuration file

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
Copy

Cache-Hinweise für Testen und Prototyping

Der Konnektor speichert Überprüfungen auf das Vorhandensein von Tabellen und Pipes im Cache, um die Performance beim Rebalancing von Partitionen zu verbessern. Beim Testen und beim Prototyping kann dieses Caching-Verhalten jedoch dazu führen, dass der Konnektor manuell erstellte Tabellen oder Pipes nicht sofort erkennt.

Problem: Wenn Sie eine Tabelle oder Pipe manuell erstellen, während der Konnektor ausgeführt wird, kann der Konnektor standardmäßig bis zu 5 Minuten lang zwischengespeicherte Ergebnisse der Existenzprüfung verwenden (die darauf hinweisen können, dass das Objekt nicht existiert). Dies kann zu unerwarteten Fehlern oder Verhaltensweisen beim Testen führen.

Empfehlung zum Testen: Um Cache-bezogene Probleme während des Testens und Prototyping zu vermeiden, konfigurieren Sie für beide Parameter für den Cacheablauf den Mindestwert von 1 Millisekunden, oder deaktivieren Sie das Caching:

snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
Copy

Diese Konfiguration stellt sicher, dass der Konnektor bei jedem Neuausgleichen einer Partition neue Existenzprüfungen durchführt, sodass Sie die Auswirkungen von manuell erstellten Tabellen und Pipes sofort sehen können.

Wichtig

Diese minimalen Cache-Einstellungen werden nur für Tests und Prototyping empfohlen. Verwenden Sie in Produktionsumgebungen die Standardwerte für den Cache-Ablauf (5 Minuten oder mehr), um Metadaten-Abfragen an Snowflake zu minimieren und die Performance beim Neuausgleichen zu verbessern, insbesondere wenn Sie viele Partitionen verarbeiten.

Konnektor-Konfigurationseigenschaften

Erforderliche Eigenschaften

name

Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter topic2table.map (unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

Bemerkung

Entweder topics oder topics.regex ist erforderlich, nicht beide.

topics.regex

Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder topics oder topics.regex enthalten, nicht beides.

snowflake.url.name

Die URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (https://) und die Portnummer optional sind.

snowflake.user.name

Anmeldename des Benutzers für das Snowflake-Konto.

snowflake.role.name

Der Name der Rolle, mit der der Konnektor Daten in die Tabelle einfügt.

snowflake.private.key

Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwert snowflake.private.key verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation verschlüsselt wurden.

Bemerkung

Siehe auch snowflake.private.key.passphrase in Optionale Eigenschaften.

snowflake.database.name

Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.

snowflake.schema.name

Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.

header.converter

Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist "org.apache.kafka.connect.storage.StringConverter".

key.converter

Schlüsselkonverter des Kafka-Datensatzes (z. B. "org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

value.converter

Der Konnektor unterstützt die Standard-Konverter der Kafka-Community. Wählen Sie den passenden Konverter auf der Grundlage Ihres Datenformats aus:

  • Für JSON-Datensätze: "org.apache.kafka.connect.json.JsonConverter"

  • Für Avro-Datensätze mit Schema-Registry: "io.confluent.connect.avro.AvroConverter"

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

Optionale Eigenschaften

snowflake.private.key.passphrase

Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.

tasks.max

Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Um eine optimale Leistung zu erzielen, empfiehlt Snowflake, die Anzahl der Tasks auf die Gesamtzahl der Kafka-Partitionen festzulegen, jedoch nicht die Anzahl der CPU-Kerne zu überschreiten. Eine hohe Anzahl von Tasks kann zu einem erhöhten Arbeitsspeicherverbrauch und häufigen Anpassungen führen.

snowflake.topic2table.map

Mit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner. Die Konfiguration von Themen erlaubt die Verwendung regulärer Ausdrücke zur Definition von Themen, genau wie die Verwendung von topics.regex. Die regulären Ausdrücke dürfen nicht mehrdeutig sein — jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen.

Beispiel:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

könnte geschrieben werden als:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.

value.converter.break.on.schema.registry.error

Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist false. Setzen Sie den Wert auf true, um dieses Verhalten zu aktivieren.

jvm.proxy.host

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.

jvm.proxy.port

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.

snowflake.streaming.max.client.lag

Specifies how often the connector flushes the data to Snowflake, in seconds.

Werte:
  • Minimum: 1 Sekunde

  • Maximum: 600 Sekunden

Standard:

1 Sekunde

jvm.proxy.username

Benutzername, der sich beim Proxyserver authentifiziert.

jvm.proxy.password

Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.

snowflake.jdbc.map

Beispiel: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

Weitere JDBC-Eigenschaften (siehe Übersicht der Verbindungsparameter für den JDBC-Treiber) werden nicht validiert. Diese zusätzlichen Eigenschaften werden nicht validiert und dürfen weder überschrieben noch anstelle der erforderlichen Eigenschaften verwendet werden, wie z. B: jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name usw.

Geben Sie eine der folgenden Kombinationen an:
  • tracing-Eigenschaft zusammen mit der Variable JDBC_TRACE env

  • database-Eigenschaft zusammen mit snowflake.database.name

Führt zu einem mehrdeutigen Verhalten und das Verhalten wird durch den JDBC-Treiber bestimmt.

value.converter.basic.auth.credentials.source

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter value.converter.basic.auth.user.info an. Andernfalls lassen Sie diesen Parameter aus.

value.converter.basic.auth.user.info

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.

snowflake.metadata.createtime

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert CreateTime in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.topic

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert topic in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.offset.and.partition

Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte Offset und Partition in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.all

Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.

transforms

Geben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.

Setzen Sie den Eigenschaftswert auf "tombstoneHandlerExample".

Bemerkung

Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert value.converter (z. B. org.apache.kafka.connect.json.JsonConverter oder org.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaft behavior.on.null.values.

transforms.tombstoneHandlerExample.type

Erforderlich beim Einstellen der transforms-Eigenschaft.

Setzen Sie den Eigenschaftswert auf "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Geben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.

Diese Eigenschaft unterstützt die folgenden Werte:

DEFAULT

Wenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.

IGNORE

Der Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.

Der Standardwert ist DEFAULT.

Bemerkung

Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:

  • Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften transform und transforms.tombstoneHandlerExample.type.

  • Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.

An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.

Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation

Der Kafka-Konnektor basiert auf der Schlüsselpaar-Authentifizierung anstelle der Authentifizierung mit Benutzername und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.

Nachdem Sie die Aufgaben zur Schlüsselpaar-Authentifizierung auf dieser Seite und die Aufgaben für die Schlüsselpaar-Rotation abgeschlossen haben, prüfen Sie die Empfehlung für Externalisieren von Geheimnissen (unter diesem Thema).

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Bemerkung

    Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (d. h. FIPS 140-2) zu erfüllen. Weitere Informationen dazu finden Sie unter FIPS 140-2.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    Wobei <Algorithmus> ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.

    So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft snowflake.private.key.passphrase an.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei rsa_key.p8, dann verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Notieren Sie den Pfad zu den Dateien. Der private Schlüssel wird im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt. Die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in der Verantwortung der Benutzenden, die Datei zu sichern, wenn sie nicht verwendet wird.

  4. Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu.

    For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    Bemerkung

    Die Eigenschaft RSA_PUBLIC_KEY_2_FP wird unter Konfigurieren der Schlüsselpaar-Rotation beschrieben.

  5. Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld snowflake.private.key der Konfigurationsdatei ein. Speichern Sie die Datei.

Externalisieren von Geheimnissen

Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.

Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes.

Starting the connector

Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters. Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:

Verteilter Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

Eigenständiger Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

Bemerkung

Bei einer Standardinstallation von Apache Kafka oder Confluent Kafka sollte die Datei connect-standalone.properties bereits enthalten sein.

Nächste Schritte

Konnektor testen.