Snowflake High Performance connector for Kafka: Install and configure¶
This topic describes the steps to install and configure the Snowflake High Performance connector for Kafka.
Installieren des Kafka-Konnektors¶
Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.
Snowflake bietet zwei Versionen des Konnektors:
Eine Version für die Confluent Kafka-Installation.
A version for the open source software (OSS) Apache Kafka https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ ecosystem.
Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.
Installationsvoraussetzungen¶
The Kafka connector supports the following package versions:
Paket
Snowflake Kafka Connector Version
Paketunterstützung (von Snowflake getestet)
Apache Kafka
2.0.0 (oder höher)
Apache Kafka 2.8.2, 3.7.2
Confluent
2.0.0 (oder höher)
Confluent 6.2.15, 7.8.2
Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 3.9.0 ausgelegt. Spätere Versionen der Kafka Connect-API wurden nicht getestet. Versionen vor 3.9.0 sind mit dem Konnektor kompatibel. Weitere Informationen dazu finden Sie unter Kafka-Kompatibilität.
Wenn Sie sowohl den Kafka-Konnektor als auch die JAR-Dateien des JDBC-Treibers in Ihrer Umgebung haben, stellen Sie sicher, dass Ihre JDBC-Version mit der
snowflake-jdbc-Version übereinstimmt, die in der Dateipom.xmlIhrer vorgesehenen Kafka-Konnektor-Version angegeben ist. Sie können zu Ihrer bevorzugten Release-Version des Kafka-Konnektors wechseln, z. B. v4.0.0-rc4. Durchsuchen Sie dann die Dateipom.xml, um die Version vonsnowflake-jdbczu ermitteln.Wenn Sie das Avro-Format zur Aufnahme von Daten verwenden, gilt Folgendes:
Verwenden Sie den Avro-Parser, Version 1.8.2 (oder höher), verfügbar unter https://mvnrepository.com/artifact/org.apache.avro.
Wenn Sie für Avro die Schemaregistrierungsfunktion verwenden, verwenden Sie Version 5.0.0 (oder höher) des Kafka Connect Avro Converter, der unter https://mvnrepository.com/artifact/io.confluent verfügbar ist.
Beachten Sie, dass die Schema-Registrierungsfunktion im Apache Kafka-OSS-Paket nicht verfügbar ist.
Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.
Install and configure the Kafka Connect cluster.
Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)
Wir empfehlen Ihnen, für Kafka-Broker und die Kafka Connect-Laufzeitumgebung die gleiche Version zu verwenden.
Es wird dringend empfohlen, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.
Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.
Installing the connector¶
Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent. Die folgende Tabelle beschreibt die unterstützten Versionen und Informationen zu Pre-Releases und Release-Kandidaten.
Release-Serie |
Status |
Anmerkungen |
|---|---|---|
4.x.x |
Öffentliche Vorschau |
Frühzeitiger Zugriff. Unterstützt Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview Derzeit muss die Migration von den Versionen 3.x auf die Version 2.x manuell erfolgen. Sie kann nicht als Ersatz für frühere Versionen verwendet werden. Die Features unterscheiden sich von den Versionen 3.x, 2.x, 1.x |
3.x.x |
Offiziell unterstützt |
Keine Unterstützung für Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview. |
2.x.x |
Offiziell unterstützt |
Upgrade empfohlen. Keine Unterstützung für Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview. |
1.x.x |
Nicht unterstützt |
Verwenden Sie diese Release-Serie nicht. |
Installing the connector for Confluent¶
Download the Kafka connector files¶
Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:
- Confluent Hub:
-
Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaar-Authentifizierung erforderlich sind. Weitere Informationen dazu finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema).
- Zentrales Maven-Repository:
https://mvnrepository.com/artifact/com.snowflake
Wenn Sie diese Version verwenden, müssen Sie die Bouncy Castle-Kryptographie-Bibliotheken (z. B JAR-Dateien) herunterladen:
Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.
Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.
Install the Kafka connector¶
Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:
Installieren des Konnektors für Open Source Apache Kafka¶
Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.
Install Apache Kafka¶
Laden Sie das Kafka-Paket von der offiziellen Kafka-Website herunter.
Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.
Führen Sie den folgenden Befehl aus, um die Datei
kafka_<Scala-Version>-<Kafka-Version>.tgzzu dekomprimieren:tar xzvf kafka_<scala_version>-<kafka_version>.tgz
JDK installieren¶
Installieren und konfigurieren Sie das Java Development Kit (JDK) Version 11 oder höher. Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.
Wenn Sie bereits zuvor das JDK installiert haben, können Sie diesen Abschnitt überspringen.
Laden Sie das JDK von der Oracle JDK-Website herunter.
Installieren oder dekomprimieren Sie das JDK.
Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.
Download the Kafka connector JAR files¶
Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:
Laden Sie die JAR-Dateien für die Bouncy Castle-Kryptographie-Bibliothek herunter:
Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei (1.11.4) herunter:
Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.
Install the Kafka connector¶
Kopieren Sie die JAR-Dateien, die Sie in Installieren des Konnektors für Open Source Apache Kafka heruntergeladen haben, in den Ordner <kafka_dir>/libs.
Configuring the Kafka connector¶
Im Standalone-Modus wird der Konnektor konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt. Bei Bereitstellung im verteilten Modus wird der Konnektor durch den Aufruf des REST API-Endpunkts des Kafka Connect-Clusters konfiguriert.
Wichtig
The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. Configuration settings include sensitive information, specifically, the Snowflake username and private key. Make sure to secure the communication channel between Kafka Connect nodes. For more information, see the documentation for your Apache Kafka software.
Each configuration specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be stored in a single database and schema.
Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.
Beschreibungen der Konfigurationsfelder finden Sie unter Konnektor-Konfigurationseigenschaften.
Wichtig
Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.
In addition, consider storing the configuration file in a secure external location or a key management service. For more information, see Externalizing Secrets (in this topic).
Verteilter Modus¶
Erstellen Sie die Kafka-Konfigurationsdatei, z. B. <path>/<config_file>.json. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration. Die Datei muss im JSON-Format vorliegen.
Sample configuration file
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "none"
}
}
Eigenständiger Modus¶
Erstellen Sie eine Konfigurationsdatei, z. B. <kafka_dir>/config/SF_connect.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.
Sample configuration file
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
Cache-Hinweise für Testen und Prototyping¶
Der Konnektor speichert Überprüfungen auf das Vorhandensein von Tabellen und Pipes im Cache, um die Performance beim Rebalancing von Partitionen zu verbessern. Beim Testen und beim Prototyping kann dieses Caching-Verhalten jedoch dazu führen, dass der Konnektor manuell erstellte Tabellen oder Pipes nicht sofort erkennt.
Problem: Wenn Sie eine Tabelle oder Pipe manuell erstellen, während der Konnektor ausgeführt wird, kann der Konnektor standardmäßig bis zu 5 Minuten lang zwischengespeicherte Ergebnisse der Existenzprüfung verwenden (die darauf hinweisen können, dass das Objekt nicht existiert). Dies kann zu unerwarteten Fehlern oder Verhaltensweisen beim Testen führen.
Empfehlung zum Testen: Um Cache-bezogene Probleme während des Testens und Prototyping zu vermeiden, konfigurieren Sie für beide Parameter für den Cacheablauf den Mindestwert von 1 Millisekunden, oder deaktivieren Sie das Caching:
snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
Diese Konfiguration stellt sicher, dass der Konnektor bei jedem Neuausgleichen einer Partition neue Existenzprüfungen durchführt, sodass Sie die Auswirkungen von manuell erstellten Tabellen und Pipes sofort sehen können.
Wichtig
Diese minimalen Cache-Einstellungen werden nur für Tests und Prototyping empfohlen. Verwenden Sie in Produktionsumgebungen die Standardwerte für den Cache-Ablauf (5 Minuten oder mehr), um Metadaten-Abfragen an Snowflake zu minimieren und die Performance beim Neuausgleichen zu verbessern, insbesondere wenn Sie viele Partitionen verarbeiten.
Konnektor-Konfigurationseigenschaften¶
Erforderliche Eigenschaften¶
nameAnwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsKommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter
topic2table.map(unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.Bemerkung
Entweder
topicsodertopics.regexist erforderlich, nicht beide.topics.regexDies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder
topicsodertopics.regexenthalten, nicht beides.snowflake.url.nameDie URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (
https://) und die Portnummer optional sind.snowflake.user.nameAnmeldename des Benutzers für das Snowflake-Konto.
snowflake.role.nameDer Name der Rolle, mit der der Konnektor Daten in die Tabelle einfügt.
snowflake.private.keyDer private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter
snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwertsnowflake.private.keyverschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation verschlüsselt wurden.Bemerkung
Siehe auch
snowflake.private.key.passphrasein Optionale Eigenschaften.snowflake.database.nameDer Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.
snowflake.schema.nameDer Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.
header.converterNur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist
"org.apache.kafka.connect.storage.StringConverter".key.converterSchlüsselkonverter des Kafka-Datensatzes (z. B.
"org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
value.converterDer Konnektor unterstützt die Standard-Konverter der Kafka-Community. Wählen Sie den passenden Konverter auf der Grundlage Ihres Datenformats aus:
Für JSON-Datensätze:
"org.apache.kafka.connect.json.JsonConverter"Für Avro-Datensätze mit Schema-Registry:
"io.confluent.connect.avro.AvroConverter"
Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
Optionale Eigenschaften¶
snowflake.private.key.passphraseWenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.
tasks.maxAnzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Um eine optimale Leistung zu erzielen, empfiehlt Snowflake, die Anzahl der Tasks auf die Gesamtzahl der Kafka-Partitionen festzulegen, jedoch nicht die Anzahl der CPU-Kerne zu überschreiten. Eine hohe Anzahl von Tasks kann zu einem erhöhten Arbeitsspeicherverbrauch und häufigen Anpassungen führen.
snowflake.topic2table.mapMit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner. Die Konfiguration von Themen erlaubt die Verwendung regulärer Ausdrücke zur Definition von Themen, genau wie die Verwendung von
topics.regex. Die regulären Ausdrücke dürfen nicht mehrdeutig sein — jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen.Beispiel:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
könnte geschrieben werden als:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.urlWenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.
value.converter.break.on.schema.registry.errorWenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist
false. Setzen Sie den Wert auftrue, um dieses Verhalten zu aktivieren.jvm.proxy.hostDamit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.
jvm.proxy.portDamit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.
snowflake.streaming.max.client.lagSpecifies how often the connector flushes the data to Snowflake, in seconds.
- Werte:
Minimum:
1SekundeMaximum:
600Sekunden
- Standard:
1Sekunde
jvm.proxy.usernameBenutzername, der sich beim Proxyserver authentifiziert.
jvm.proxy.passwordKennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.
snowflake.jdbc.mapBeispiel:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Weitere JDBC-Eigenschaften (siehe Übersicht der Verbindungsparameter für den JDBC-Treiber) werden nicht validiert. Diese zusätzlichen Eigenschaften werden nicht validiert und dürfen weder überschrieben noch anstelle der erforderlichen Eigenschaften verwendet werden, wie z. B:
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameusw.- Geben Sie eine der folgenden Kombinationen an:
tracing-Eigenschaft zusammen mit der VariableJDBC_TRACEenvdatabase-Eigenschaft zusammen mitsnowflake.database.name
Führt zu einem mehrdeutigen Verhalten und das Verhalten wird durch den JDBC-Treiber bestimmt.
value.converter.basic.auth.credentials.sourceWenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter
value.converter.basic.auth.user.infoan. Andernfalls lassen Sie diesen Parameter aus.value.converter.basic.auth.user.infoWenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.
snowflake.metadata.createtimeWenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
CreateTimein den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.topicWenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
topicin den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.offset.and.partitionWenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte
OffsetundPartitionin den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.allWenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.
transformsGeben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.
Setzen Sie den Eigenschaftswert auf
"tombstoneHandlerExample".Bemerkung
Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert
value.converter(z. B.org.apache.kafka.connect.json.JsonConverteroderorg.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaftbehavior.on.null.values.transforms.tombstoneHandlerExample.typeErforderlich beim Einstellen der
transforms-Eigenschaft.Setzen Sie den Eigenschaftswert auf
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesGeben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.
Diese Eigenschaft unterstützt die folgenden Werte:
DEFAULTWenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.
IGNOREDer Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.
Der Standardwert ist
DEFAULT.Bemerkung
Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:
Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften
transformundtransforms.tombstoneHandlerExample.type.Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.
An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.
Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation¶
Der Kafka-Konnektor basiert auf der Schlüsselpaar-Authentifizierung anstelle der Authentifizierung mit Benutzername und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.
Nachdem Sie die Aufgaben zur Schlüsselpaar-Authentifizierung auf dieser Seite und die Aufgaben für die Schlüsselpaar-Rotation abgeschlossen haben, prüfen Sie die Empfehlung für Externalisieren von Geheimnissen (unter diesem Thema).
So konfigurieren Sie das Public/Private-Schlüsselpaar:
Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:
Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.
Bemerkung
Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (d. h. FIPS 140-2) zu erfüllen. Weitere Informationen dazu finden Sie unter FIPS 140-2.
Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:
$ openssl genrsa -out rsa_key.pem 2048
Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
Wobei
<Algorithmus>ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft
snowflake.private.key.passphrasean.Beispiel für einen privaten PEM-Schlüssel
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:
Angenommen, der private Schlüssel befindet sich in der Datei
rsa_key.p8, dann verwenden Sie den folgenden Befehl:$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Beispiel für einen öffentlichen PEM-Schlüssel
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Notieren Sie den Pfad zu den Dateien. Der private Schlüssel wird im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt. Die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in der Verantwortung der Benutzenden, die Datei zu sichern, wenn sie nicht verwendet wird.
Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu.
For example:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
Bemerkung
Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.
Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.
Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
Bemerkung
Die Eigenschaft
RSA_PUBLIC_KEY_2_FPwird unter Konfigurieren der Schlüsselpaar-Rotation beschrieben.Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld
snowflake.private.keyder Konfigurationsdatei ein. Speichern Sie die Datei.
Externalisieren von Geheimnissen¶
Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.
Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes.
Starting the connector¶
Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters. Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:
Verteilter Modus¶
Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Eigenständiger Modus¶
Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Bemerkung
Bei einer Standardinstallation von Apache Kafka oder Confluent Kafka sollte die Datei connect-standalone.properties bereits enthalten sein.