Installieren und Konfigurieren des Kafka-Konnektors¶
Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.
Snowflake bietet zwei Versionen des Konnektors:
Eine Version für die Confluent-Paketversion von Kafka
Eine Version für das Apache Kafka-OSS-Paket (Open Source-Software).
Bemerkung
Der Kafka-Konnektor unterliegt den Nutzungsbedingungen für Konnektoren.
Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.
Unter diesem Thema:
Konfigurieren der Zugriffssteuerung für Snowflake-Objekte¶
Erforderliche Berechtigungen¶
Das Erstellen und Verwalten von Snowflake-Objekten, die vom Kafka-Konnektor verwendet werden, erfordert eine Rolle mit den folgenden minimalen Berechtigungen:
Objekt |
Berechtigung |
Anmerkungen |
---|---|---|
Datenbank |
USAGE |
|
Schema |
USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE |
Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE |
Tabelle |
OWNERSHIP |
Nur erforderlich, wenn der Kafka-Konnektor zum Einlesen von Daten in eine vorhandene Tabelle verwendet wird. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kafka-Thema erstellt, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Tabelleneigentümer (d. h. er verfügt über die Berechtigung OWNERSHIP für die Tabelle). |
Stagingbereich |
READ . WRITE |
Nur erforderlich, wenn der Kafka-Konnektor zum Bereitstellen von Datendateien von Kafka in einem vorhandenen internen Stagingbereich verwendet wird (nicht empfohlen). . Wenn der Konnektor einen neuen Stagingbereich zum temporären Speichern von Datendateien erstellt, die vom Kafka-Thema verwendet werden, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Eigentümer des Stagingbereichs (d. h. er hat die Berechtigung OWNERSHIP für den Stagingbereich). |
Snowflake empfiehlt, für jede Kafka-Instanz einen eigenen Benutzer (mit CREATE USER) und eine eigene Rolle (mit CREATE ROLE) zu erstellen, damit die Zugriffsrechte bei Bedarf einzeln widerrufen werden können. Die Rolle sollte dem Benutzer als Standardrolle zugewiesen werden.
Erstellen einer Rolle zur Verwendung des Kafka-Konnektors¶
Das folgende Skript erstellt eine benutzerdefinierte Rolle zur Verwendung durch den Kafka-Konnektor (z. B. KAFKA_CONNECTOR_ROLE_1). Jede Rolle, die Berechtigungen erteilen kann (z. B. SECURITYADMIN oder eine Rolle mit der Berechtigung MANAGE GRANTS), kann jedem Benutzer diese benutzerdefinierte Rolle zuweisen, damit der Kafka-Konnektor die erforderlichen Snowflake-Objekte erstellen und Daten in Tabellen einfügen kann. Das Skript referenziert eine bestimmte vorhandene Datenbank, ein bestimmtes Schema (kafka_db.kafka_schema
) und einen bestimmten Benutzer (kafka_connector_user_1
):
-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;
-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;
-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;
-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Weitere Informationen zum Erstellen von benutzerdefinierten Rollen und Rollenhierarchien finden Sie unter Konfigurieren der Zugriffssteuerung.
Installationsvoraussetzungen¶
Der Kafka-Konnektor unterstützt die folgenden Paketversionen:
Paket
Version des Snowflake-Kafka-Konnektors
Paketunterstützung (von Snowflake getestet)
Apache Kafka
2.0.0 (oder höher)
Apache Kafka 2.5.1, 2.8.1, 3.2.1
Confluent
2.0.0 (oder höher)
Confluent 6.2.6, 7.2.1
Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 3.2.3 ausgelegt. Alle neueren Versionen der Kafka Connect-API wurden nicht getestet. Alle Versionen, die älter als 3.2.3 sind, sind mit dem Konnektor kompatibel. Weitere Informationen dazu finden Sie unter Kafka-Kompatibilität.
Wenn Sie sowohl den Kafka-Konnektor als auch die JAR-Dateien des JDBC-Treibers in Ihrer Umgebung haben, stellen Sie sicher, dass Ihre JDBC-Version mit der
snowflake-jdbc
-Version übereinstimmt, die in der Dateipom.xml
Ihrer vorgesehenen Kafka-Konnektor-Version angegeben ist. Sie können zu Ihrer bevorzugten Release-Version des Kafka-Konnektors wechseln, z. B. v2.0.1. Durchsuchen Sie dann die Dateipom.xml
, um die Version vonsnowflake-jdbc
zu ermitteln.Wenn Sie das Avro-Format zum Erfassen von Daten verwenden, gilt Folgendes:
Verwenden Sie den Avro-Parser, Version 1.8.2 (oder höher), verfügbar unter https://mvnrepository.com/artifact/org.apache.avro.
Wenn Sie für Avro die Schemaregistrierungsfunktion verwenden, verwenden Sie Version 5.0.0 (oder höher) des Kafka Connect Avro Converter, der unter https://mvnrepository.com/artifact/io.confluent verfügbar ist.
Beachten Sie, dass die Schema-Registrierungsfunktion im Apache Kafka-OSS-Paket nicht verfügbar ist.
Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.
Installieren und konfigurieren Sie den Kafka Connect-Cluster.
Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)
Wir empfehlen Ihnen, für Kafka-Broker und die Kafka Connect-Laufzeitumgebung die gleiche Version zu verwenden.
Es wird dringend empfohlen, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.
Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.
Installieren des Konnektors¶
Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent. Die Versionen des Kafka-Konnektors können Sie der folgenden Tabelle entnehmen:
Release-Serie |
Status |
Anmerkungen |
---|---|---|
2.x.x |
Offiziell unterstützt |
Neueste Version und dringend empfohlen. |
1.9.x |
Offiziell unterstützt |
Upgrade empfohlen. |
1.8.x |
Nicht unterstützt |
Verwenden Sie diese Release-Serie nicht. |
1.7.x |
Nicht unterstützt |
Verwenden Sie diese Release-Serie nicht. |
Installieren des Konnektors für Confluent¶
Kafka-Konnektordateien herunterladen¶
Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:
- Confluent Hub:
-
Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaar-Authentifizierung erforderlich sind. Weitere Informationen dazu finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema).
- Zentrales Maven-Repository:
https://mvnrepository.com/artifact/com.snowflake
Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaar-Authentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:
Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.
Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.
Kafka-Konnektor installieren¶
Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:
Installieren des Konnektors für Open Source Apache Kafka¶
Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.
Apache Kafka installieren¶
Laden Sie das Kafka-Paket von der offiziellen Website herunter: https://kafka.apache.org/downloads.
Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.
Führen Sie den folgenden Befehl aus, um die Datei
kafka_<Scala-Version>-<Kafka-Version>.tgz
zu dekomprimieren:tar xzvf kafka_<scala_version>-<kafka_version>.tgz
JDK installieren¶
Installieren und konfigurieren Sie das Java Development Kit (JDK). Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.
Wenn Sie diesen Schritt bereits ausgeführt haben, können Sie den Abschnitt überspringen.
Laden Sie das JDK von https://www.oracle.com/technetwork/java/javase/downloads/index.html herunter.
Installieren oder dekomprimieren Sie das JDK.
Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.
JAR-Dateien des Kafka-Konnektors herunterladen¶
Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:
Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaar-Authentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:
Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei herunter:
Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.
Kafka-Konnektor installieren¶
Kopieren Sie die JAR-Dateien, die Sie unter JAR-Dateien des Kafka-Konnektors herunterladen heruntergeladen haben, in den Ordner <Kafka-Verzeichnis>/libs
.
Konfigurieren des Kafka-Konnektors¶
Der Konnektor wird konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt.
Wichtig
Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom Masterknoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Anweisungen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.
Jede Konfigurationsdatei legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.
Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.
Beschreibungen der Konfigurationsfelder finden Sie unter Kafka-Konfigurationseigenschaften.
Wichtig
Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.
Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern. Weitere Informationen dazu finden Sie unter Externalisieren von Geheimnissen (unter diesem Thema).
Verteilter Modus¶
Erstellen Sie die Kafka-Konfigurationsdatei, z. B. <Pfad>/<Konfigurationsdatei>.json
. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration. Die Datei sollte im JSON-Format vorliegen.
Beispielhafte Konfigurationsdatei
{
"name":"XYZCompanySensorData",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"topic1,topic2",
"snowflake.topic2table.map": "topic1:table1,topic2:table2",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.user.name":"jane.smith",
"snowflake.private.key":"xyz123",
"snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
"snowflake.database.name":"mydb",
"snowflake.schema.name":"myschema",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
}
}
Eigenständiger Modus¶
Erstellen Sie eine Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/SF_connect.properties
. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.
Beispielhafte Konfigurationsdatei
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Kafka-Konfigurationseigenschaften¶
Die folgenden Eigenschaften können in der Kafka-Konfigurationsdatei entweder für den verteilten oder den eigenständigen Modus festgelegt werden:
Erforderliche Eigenschaften¶
name
Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.
connector.class
com.snowflake.kafka.connector.SnowflakeSinkConnector
topics
Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter
topic2table.map
(unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.Bemerkung
Entweder
topics
odertopics.regex
ist erforderlich, nicht beide.topics.regex
Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder
topics
odertopics.regex
enthalten, nicht beides.snowflake.url.name
Die URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (
https://
) und die Portnummer optional sind.snowflake.user.name
Anmeldename des Benutzers für das Snowflake-Konto.
snowflake.private.key
Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter
snowflake.private.key.passphrase
, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwertsnowflake.private.key
verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema) verschlüsselt wurden.Bemerkung
Siehe auch
snowflake.private.key.passphrase
unter Optionale Eigenschaften (unter diesem Thema).snowflake.database.name
Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.
snowflake.schema.name
Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.
header.converter
Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist
"org.apache.kafka.connect.storage.StringConverter"
.key.converter
Schlüsselkonverter des Kafka-Datensatzes (z. B.
"org.apache.kafka.connect.storage.StringConverter"
). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
value.converter
Wenn die Datensätze in JSON formatiert sind, sollte dies
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
lauten.Wenn die Datensätze in Avro formatiert sind und den Schema-Registrierungsdienst von Kafka verwenden, sollte dies
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter"
lauten.Wenn die Datensätze in Avro formatiert sind und das Schema enthalten (und daher nicht den Schema-Registrierungsdienst von Kafka benötigen), sollte dies
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"
lauten.Wenn die Datensätze als Nur-Text formatiert sind, sollte dies
"org.apache.kafka.connect.storage.StringConverter"
lauten.Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
Optionale Eigenschaften¶
snowflake.private.key.passphrase
Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.
tasks.max
Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Diese Zahl kann niedriger oder höher eingestellt werden. Snowflake empfiehlt jedoch keine höhere Einstellung.
snowflake.topic2table.map
Mit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.
buffer.count.records
Anzahl der Datensätze, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake erfasst werden. Der Standardwert ist
10000
Datensätze.buffer.flush.time
Anzahl der Sekunden zwischen Pufferleerungen, wobei die Leerung vom Kafka-Speichercache zum internen Stagingbereich erfolgt. Der Standardwert ist
120
Sekunden.buffer.size.bytes
Kumulative Größe der Datensätze in Byte, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake als Datendateien erfasst werden. Der Standardwert hierfür ist
5000000
(5 MB).Die Datensätze werden beim Schreiben in die Datendateien komprimiert. Infolgedessen kann die Größe der Datensätze im Puffer größer sein als die Größe der aus den Datensätzen erstellten Datendateien.
value.converter.schema.registry.url
Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.
value.converter.break.on.schema.registry.error
Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist
false
. Setzen Sie den Wert auftrue
, um dieses Verhalten zu aktivieren.Unterstützt von Kafka-Konnektor, Version 1.4.2 (und höher).
jvm.proxy.host
Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.
jvm.proxy.port
Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.
jvm.proxy.username
Benutzername, der sich beim Proxyserver authentifiziert.
Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).
jvm.proxy.password
Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.
Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).
value.converter.basic.auth.credentials.source
Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter
value.converter.basic.auth.user.info
an. Andernfalls lassen Sie diesen Parameter aus.value.converter.basic.auth.user.info
Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.
snowflake.metadata.createtime
Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
CreateTime
in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).
snowflake.metadata.topic
Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
topic
in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).
snowflake.metadata.offset.and.partition
Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte
Offset
undPartition
in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).
snowflake.metadata.all
Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.
Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).
transforms
Geben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.
Setzen Sie den Eigenschaftswert auf
"tombstoneHandlerExample"
.Bemerkung
Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert
value.converter
(z. B.org.apache.kafka.connect.json.JsonConverter
oderorg.apache.kafka.connect.json.AvroConverter
). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaftbehavior.on.null.values
.transforms.tombstoneHandlerExample.type
Erforderlich beim Einstellen der
transforms
-Eigenschaft.Setzen Sie den Eigenschaftswert auf
"io.confluent.connect.transforms.TombstoneHandler"
behavior.on.null.values
Geben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.
Diese Eigenschaft unterstützt die folgenden Werte:
DEFAULT
Wenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.
IGNORE
Der Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.
Der Standardwert ist
DEFAULT
.Bemerkung
Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:
Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften
transform
undtransforms.tombstoneHandlerExample.type
.Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.
An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.
Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation¶
Der Kafka-Konnektor basiert auf einer Schlüsselpaar-Authentifizierung und nicht auf eine Basisauthentifizierung (d. h. Benutzername und Kennwort). Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.
Nachdem Sie die Anleitung zur Schlüsselpaarauthentifizierung auf dieser Seite und die Anweisungen zur Schlüsselpaar-Rotation ausgeführt haben, prüfen Sie die Empfehlung zur Externalisierung von Geheimnissen (unter diesem Thema).
So konfigurieren Sie das Public/Private-Schlüsselpaar:
Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:
Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.
Bemerkung
Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (d. h. FIPS 140-2) zu erfüllen. Weitere Informationen dazu finden Sie unter FIPS 140-2.
Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:
$ openssl genrsa -out rsa_key.pem 2048
Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
Wobei
<Algorithmus>
ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft
snowflake.private.key.passphrase
an.Beispiel für einen privaten PEM-Schlüssel
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:
Angenommen, der private Schlüssel befindet sich in der Datei
rsa_key.p8
, dann verwenden Sie den folgenden Befehl:$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Beispiel für einen öffentlichen PEM-Schlüssel
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Zeichnen Sie den Pfad zu den Dateien auf. Beachten Sie, dass der private Schlüssel im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt wird; die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in Ihrer Verantwortung, die Datei bei Nichtverwendung entsprechend zu sichern.
Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu. Beispiel:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
Bemerkung
Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.
Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.
Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
Bemerkung
Die Eigenschaft
RSA_PUBLIC_KEY_2_FP
wird unter Konfigurieren der Schlüsselpaar-Rotation beschrieben.Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld
snowflake.private.key
der Konfigurationsdatei ein. Speichern Sie die Datei.
Externalisieren von Geheimnissen¶
Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider
-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.
Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes.
Starten von Kafka¶
Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters.
Starten des Kafka-Konnektors¶
Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:
Verteilter Modus¶
Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Eigenständiger Modus¶
Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
(Bei einer Standardinstallation von Apache Kafka oder Confluent Kafka sollte die Datei connect-standalone.properties
bereits enthalten sein.)
Testen und Verwenden des Kafka-Konnektors¶
Wir empfehlen, den Kafka-Konnektor mit einer kleinen Datenmenge zu testen, bevor Sie den Konnektor in einem Produktionssystem verwenden. Der Vorgang zum Testen ist der gleiche wie bei der normalen Verwendung des Konnektors:
Stellen Sie sicher, dass Kafka und Kafka Connect ausgeführt werden.
Stellen Sie sicher, dass Sie das entsprechende Kafka-Thema erstellt haben.
Erstellen Sie einen Nachrichten-Herausgeber (oder nutzen Sie einen vorhandenen). Stellen Sie sicher, dass die zum Thema veröffentlichten Meldungen das korrekte Format haben (JSON, Avro und Nur-Text).
Erstellen Sie eine Konfigurationsdatei, die das zu abonnierende Thema und die Snowflake-Tabelle angibt, in die geschrieben werden soll. Eine Anleitung dazu finden Sie unter Konfigurieren des Kafka-Konnektors (unter diesem Thema).
(Optional) Erstellen Sie eine Tabelle, in die Daten geschrieben werden sollen. Dieser Schritt ist optional. Wenn Sie die Tabelle nicht erstellen, wird die Tabelle vom Kafka-Konnektor für Sie erstellt. Wenn Sie nicht vorhaben, den Konnektor zum Hinzufügen von Daten zu einer vorhandenen, nicht leeren Tabelle zu verwenden, empfiehlt Snowflake, dass Sie die Tabelle vom Konnektor erstellen lassen, um die Möglichkeit inkongruenter Schemas zu minimieren.
Erteilen Sie der Rolle, die zum Erfassen von Daten verwendet wird, die für die Snowflake-Objekte (Datenbank, Schema, Zieltabelle usw.) erforderlichen minimalen Berechtigungen.
Veröffentlichen Sie einen Beispieldatensatz im konfigurierten Kafka-Thema.
Warten Sie einige Minuten, bis sich die Daten im System verteilt haben, und überprüfen Sie dann die Snowflake-Tabelle, um sicherzustellen, dass die Datensätze eingefügt wurden.
Tipp
Überlegen Sie, ob Sie Ihre Netzwerkverbindung zu Snowflake vorab mithilfe von SnowCD überprüfen, bevor Sie Daten in Ihre Test- und Produktionsumgebung in Snowflake laden.