Installieren und Konfigurieren des Kafka-Konnektors

Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.

Snowflake bietet zwei Versionen des Konnektors:

Der Kafka-Konnektor unterliegt den Bedingungen für Drittanbieter.

Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.

Unter diesem Thema:

Konfigurieren der Zugriffssteuerung für Snowflake-Objekte

Erforderliche Berechtigungen

Das Erstellen und Verwalten von Snowflake-Objekten, die vom Kafka-Konnektor verwendet werden, erfordert eine Rolle mit den folgenden minimalen Berechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE object widerrufen werden.

Tabelle

OWNERSHIP

Nur erforderlich, wenn der Kafka-Konnektor zum Einlesen von Daten in eine vorhandene Tabelle verwendet wird. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kafka-Thema erstellt, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Tabelleneigentümer (d. h. er verfügt über die Berechtigung OWNERSHIP für die Tabelle).

Stagingbereich

READ . WRITE

Nur erforderlich, wenn der Kafka-Konnektor zum Bereitstellen von Datendateien von Kafka in einem vorhandenen internen Stagingbereich verwendet wird (nicht empfohlen). . Wenn der Konnektor einen neuen Stagingbereich zum temporären Speichern von Datendateien erstellt, die vom Kafka-Thema verwendet werden, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Eigentümer des Stagingbereichs (d. h. er hat die Berechtigung OWNERSHIP für den Stagingbereich).

Snowflake empfiehlt, für jede Kafka-Instanz einen eigenen Benutzer (mit CREATE USER) und eine eigene Rolle (mit CREATE ROLE) zu erstellen, damit die Zugriffsrechte bei Bedarf einzeln widerrufen werden können. Die Rolle sollte dem Benutzer als Standardrolle zugewiesen werden.

Erstellen einer Rolle zur Verwendung des Kafka-Konnektors

Das folgende Skript erstellt eine benutzerdefinierte Rolle zur Verwendung durch den Kafka-Konnektor (z. B. KAFKA_CONNECTOR_ROLE_1). Jede Rolle, die Berechtigungen erteilen kann (z. B. SECURITYADMIN oder eine Rolle mit der Berechtigung MANAGE GRANTS), kann jedem Benutzer diese benutzerdefinierte Rolle zuweisen, damit der Kafka-Konnektor die erforderlichen Snowflake-Objekte erstellen und Daten in Tabellen einfügen kann. Das Skript referenziert eine bestimmte vorhandene Datenbank, ein bestimmtes Schema (kafka_db.kafka_schema) und einen bestimmten Benutzer (kafka_connector_user_1):

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Copy

Weitere Informationen zum Erstellen von benutzerdefinierten Rollen und Rollenhierarchien finden Sie unter Konfigurieren der Zugriffssteuerung.

Installationsvoraussetzungen

  • Der Kafka-Konnektor unterstützt die folgenden Paketversionen:

    Paket

    Version des Snowflake-Kafka-Konnektors

    Paketunterstützung (von Snowflake getestet)

    Apache Kafka

    2.0.0 (oder höher)

    Apache Kafka 2.5.1, 2.8.1, 3.2.1

    Confluent

    2.0.0 (oder höher)

    Confluent 6.2.6, 7.2.1

  • Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 3.2.3 ausgelegt. Alle neueren Versionen der Kafka Connect-API wurden nicht getestet. Alle Versionen, die älter als 3.2.3 sind, sind mit dem Konnektor kompatibel. Weitere Informationen dazu finden Sie unter Kafka-Kompatibilität.

  • Wenn Sie sowohl den Kafka-Konnektor als auch die JAR-Dateien des JDBC-Treibers in Ihrer Umgebung haben, stellen Sie sicher, dass Ihre JDBC-Version mit der snowflake-jdbc-Version übereinstimmt, die in der Datei pom.xml Ihrer vorgesehenen Kafka-Konnektor-Version angegeben ist. Sie können zu Ihrer bevorzugten Release-Version des Kafka-Konnektors wechseln, z. B. v2.0.1. Durchsuchen Sie dann die Datei pom.xml, um die Version von snowflake-jdbc zu ermitteln.

  • Wenn Sie das Avro-Format zum Erfassen von Daten verwenden, gilt Folgendes:

  • Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.

  • Installieren und konfigurieren Sie den Kafka Connect-Cluster.

    Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)

  • Wir empfehlen Ihnen, für Kafka-Broker und die Kafka Connect-Laufzeitumgebung die gleiche Version zu verwenden.

  • Es wird dringend empfohlen, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.

Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.

Installieren des Konnektors

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent. Die Versionen des Kafka-Konnektors können Sie der folgenden Tabelle entnehmen:

Release-Serie

Status

Anmerkungen

2.x.x

Offiziell unterstützt

Neueste Version und dringend empfohlen.

1.9.x

Offiziell unterstützt

Upgrade empfohlen.

1.8.x

Nicht unterstützt

Verwenden Sie diese Release-Serie nicht.

1.7.x

Nicht unterstützt

Verwenden Sie diese Release-Serie nicht.

Bemerkung

Beachten Sie, dass dieses Feature in einem Szenario mit hohem Durchsatz (z. B. 50 MB/s pro Konnektor) zu einer höheren Latenz oder höheren Kosten führen kann. Wir empfehlen, dass Sie dieses Feature für Szenarios mit hohem Durchsatz deaktivieren, indem Sie enable.streaming.client.optimization auf „false“ setzen. Weitere Informationen dazu finden Sie unter Verwenden des Snowflake-Konnektors für Kafka mit Snowpipe Streaming.

Installieren des Konnektors für Confluent

Kafka-Konnektordateien herunterladen

Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:

Confluent Hub

https://www.confluent.io/hub/

Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaar-Authentifizierung erforderlich sind. Weitere Informationen dazu finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema).

Zentrales Maven-Repository

https://mvnrepository.com/artifact/com.snowflake

Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaar-Authentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:

Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:

Installieren des Konnektors für Open Source Apache Kafka

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.

Apache Kafka installieren

  1. Laden Sie das Kafka-Paket von der offiziellen Website herunter: https://kafka.apache.org/downloads.

  2. Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.

  3. Führen Sie den folgenden Befehl aus, um die Datei kafka_<Scala-Version>-<Kafka-Version>.tgz zu dekomprimieren:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

JDK installieren

Installieren und konfigurieren Sie das Java Development Kit (JDK). Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.

Wenn Sie diesen Schritt bereits ausgeführt haben, können Sie den Abschnitt überspringen.

  1. Laden Sie das JDK von https://www.oracle.com/technetwork/java/javase/downloads/index.html herunter.

  2. Installieren oder dekomprimieren Sie das JDK.

  3. Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.

JAR-Dateien des Kafka-Konnektors herunterladen

  1. Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:

    https://mvnrepository.com/artifact/com.snowflake

  2. Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaar-Authentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:

  3. Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei herunter:

    https://mvnrepository.com/artifact/org.apache.avro/avro

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Kopieren Sie die JAR-Dateien, die Sie unter JAR-Dateien des Kafka-Konnektors herunterladen heruntergeladen haben, in den Ordner <Kafka-Verzeichnis>/libs.

Konfigurieren des Kafka-Konnektors

Der Konnektor wird konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt.

Wichtig

Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom Masterknoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Anweisungen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.

Jede Konfigurationsdatei legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.

Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.

Beschreibungen der Konfigurationsfelder finden Sie unter Kafka-Konfigurationseigenschaften.

Wichtig

Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.

Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern. Weitere Informationen dazu finden Sie unter Externalisieren von Geheimnissen (unter diesem Thema).

Verteilter Modus

Erstellen Sie die Kafka-Konfigurationsdatei, z. B. <Pfad>/<Konfigurationsdatei>.json. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration. Die Datei sollte im JSON-Format vorliegen.

Beispielhafte Konfigurationsdatei

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}
Copy

Eigenständiger Modus

Erstellen Sie eine Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/SF_connect.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.

Beispielhafte Konfigurationsdatei

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Copy

Kafka-Konfigurationseigenschaften

Die folgenden Eigenschaften können in der Kafka-Konfigurationsdatei entweder für den verteilten oder den eigenständigen Modus festgelegt werden:

Erforderliche Eigenschaften

name

Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter topic2table.map (unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

Bemerkung

Entweder topics oder topics.regex ist erforderlich, nicht beide.

topics.regex

Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder topics oder topics.regex enthalten, nicht beides.

snowflake.url.name

Die URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (https://) und die Portnummer optional sind.

snowflake.user.name

Anmeldename des Benutzers für das Snowflake-Konto.

snowflake.private.key

Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwert snowflake.private.key verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter diesem Thema) verschlüsselt wurden.

Bemerkung

Siehe auch snowflake.private.key.passphrase unter Optionale Eigenschaften (unter diesem Thema).

snowflake.database.name

Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.

snowflake.schema.name

Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.

header.converter

Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist "org.apache.kafka.connect.storage.StringConverter".

key.converter

Schlüsselkonverter des Kafka-Datensatzes (z. B. "org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

value.converter

Wenn die Datensätze in JSON formatiert sind, sollte dies "com.snowflake.kafka.connector.records.SnowflakeJsonConverter" lauten.

Wenn die Datensätze in Avro formatiert sind und den Schema-Registrierungsdienst von Kafka verwenden, sollte dies "com.snowflake.kafka.connector.records.SnowflakeAvroConverter" lauten.

Wenn die Datensätze in Avro formatiert sind und das Schema enthalten (und daher nicht den Schema-Registrierungsdienst von Kafka benötigen), sollte dies "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry" lauten.

Wenn die Datensätze als Nur-Text formatiert sind, sollte dies "org.apache.kafka.connect.storage.StringConverter" lauten.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

Optionale Eigenschaften

snowflake.private.key.passphrase

Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.

tasks.max

Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Diese Zahl kann niedriger oder höher eingestellt werden. Snowflake empfiehlt jedoch keine höhere Einstellung.

snowflake.topic2table.map

Mit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

buffer.count.records

Anzahl der Datensätze, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake erfasst werden. Der Standardwert ist 10000 Datensätze.

buffer.flush.time

Anzahl der Sekunden zwischen Pufferleerungen, wobei die Leerung vom Kafka-Speichercache zum internen Stagingbereich erfolgt. Der Standardwert ist 120 Sekunden.

buffer.size.bytes

Kumulative Größe der Datensätze in Byte, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake als Datendateien erfasst werden. Der Standardwert hierfür ist 5000000 (5 MB).

Die Datensätze werden beim Schreiben in die Datendateien komprimiert. Infolgedessen kann die Größe der Datensätze im Puffer größer sein als die Größe der aus den Datensätzen erstellten Datendateien.

value.converter.schema.registry.url

Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.

value.converter.break.on.schema.registry.error

Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist false. Setzen Sie den Wert auf true, um dieses Verhalten zu aktivieren.

Unterstützt von Kafka-Konnektor, Version 1.4.2 (und höher).

jvm.proxy.host

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.

jvm.proxy.port

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.

jvm.proxy.username

Benutzername, der sich beim Proxyserver authentifiziert.

Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).

jvm.proxy.password

Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.

Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).

value.converter.basic.auth.credentials.source

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter value.converter.basic.auth.user.info an. Andernfalls lassen Sie diesen Parameter aus.

value.converter.basic.auth.user.info

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.

snowflake.metadata.createtime

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert CreateTime in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.topic

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert topic in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.offset.and.partition

Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte Offset und Partition in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.all

Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

transforms

Geben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.

Setzen Sie den Eigenschaftswert auf "tombstoneHandlerExample".

Bemerkung

Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert value.converter (z. B. org.apache.kafka.connect.json.JsonConverter oder org.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaft behavior.on.null.values.

transforms.tombstoneHandlerExample.type

Erforderlich beim Einstellen der transforms-Eigenschaft.

Setzen Sie den Eigenschaftswert auf "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Geben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.

Diese Eigenschaft unterstützt die folgenden Werte:

DEFAULT

Wenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.

IGNORE

Der Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.

Der Standardwert ist DEFAULT.

Bemerkung

Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:

  • Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften transform und transforms.tombstoneHandlerExample.type.

  • Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.

An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.

Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation

Der Kafka-Konnektor basiert auf einer Schlüsselpaar-Authentifizierung und nicht auf eine Basisauthentifizierung (d. h. Benutzername und Kennwort). Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.

Nachdem Sie die Anleitung zur Schlüsselpaarauthentifizierung auf dieser Seite und die Anweisungen zur Schlüsselpaar-Rotation ausgeführt haben, prüfen Sie die Empfehlung zur Externalisierung von Geheimnissen (unter diesem Thema).

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Bemerkung

    Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (d. h. FIPS 140-2) zu erfüllen. Weitere Informationen dazu finden Sie unter FIPS 140-2.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    Wobei <Algorithmus> ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.

    So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft snowflake.private.key.passphrase an.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei rsa_key.p8, dann verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Zeichnen Sie den Pfad zu den Dateien auf. Beachten Sie, dass der private Schlüssel im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt wird; die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in Ihrer Verantwortung, die Datei bei Nichtverwendung entsprechend zu sichern.

  4. Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu. Beispiel:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    Bemerkung

    Die Eigenschaft RSA_PUBLIC_KEY_2_FP wird unter Konfigurieren der Schlüsselpaar-Rotation beschrieben.

  5. Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld snowflake.private.key der Konfigurationsdatei ein. Speichern Sie die Datei.

Externalisieren von Geheimnissen

Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.

Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes.

Starten von Kafka

Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters.

Starten des Kafka-Konnektors

Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:

Verteilter Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

Eigenständiger Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

(Bei einer Standardinstallation von Apache Kafka oder Confluent Kafka sollte die Datei connect-standalone.properties bereits enthalten sein.)

Testen und Verwenden des Kafka-Konnektors

Wir empfehlen, den Kafka-Konnektor mit einer kleinen Datenmenge zu testen, bevor Sie den Konnektor in einem Produktionssystem verwenden. Der Vorgang zum Testen ist der gleiche wie bei der normalen Verwendung des Konnektors:

  1. Stellen Sie sicher, dass Kafka und Kafka Connect ausgeführt werden.

  2. Stellen Sie sicher, dass Sie das entsprechende Kafka-Thema erstellt haben.

  3. Erstellen Sie einen Nachrichten-Herausgeber (oder nutzen Sie einen vorhandenen). Stellen Sie sicher, dass die zum Thema veröffentlichten Meldungen das korrekte Format haben (JSON, Avro und Nur-Text).

  4. Erstellen Sie eine Konfigurationsdatei, die das zu abonnierende Thema und die Snowflake-Tabelle angibt, in die geschrieben werden soll. Eine Anleitung dazu finden Sie unter Konfigurieren des Kafka-Konnektors (unter diesem Thema).

  5. (Optional) Erstellen Sie eine Tabelle, in die Daten geschrieben werden sollen. Dieser Schritt ist optional. Wenn Sie die Tabelle nicht erstellen, wird die Tabelle vom Kafka-Konnektor für Sie erstellt. Wenn Sie nicht vorhaben, den Konnektor zum Hinzufügen von Daten zu einer vorhandenen, nicht leeren Tabelle zu verwenden, empfiehlt Snowflake, dass Sie die Tabelle vom Konnektor erstellen lassen, um die Möglichkeit inkongruenter Schemas zu minimieren.

  6. Erteilen Sie der Rolle, die zum Erfassen von Daten verwendet wird, die für die Snowflake-Objekte (Datenbank, Schema, Zieltabelle usw.) erforderlichen minimalen Berechtigungen.

  7. Veröffentlichen Sie einen Beispieldatensatz im konfigurierten Kafka-Thema.

  8. Warten Sie einige Minuten, bis sich die Daten im System verteilt haben, und überprüfen Sie dann die Snowflake-Tabelle, um sicherzustellen, dass die Datensätze eingefügt wurden.

Tipp

Überlegen Sie, ob Sie Ihre Netzwerkverbindung zu Snowflake vorab mithilfe von SnowCD überprüfen, bevor Sie Daten in Ihre Test- und Produktionsumgebung in Snowflake laden.