Installieren und Konfigurieren des Kafka-Konnektors

Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.

Snowflake bietet zwei Versionen des Konnektors:

  • Eine Version für die Confluent-Paketversion von Kafka

  • Eine Version für das Apache Kafka-OSS-Paket (Open Source-Software).

Die Anweisungen unter diesem Thema geben an, welche Schritte nur für beide Versionen des Konnektors gelten.

Unter diesem Thema:

Konfigurieren der Zugriffssteuerung für Snowflake-Objekte

Erforderliche Berechtigungen

Das Erstellen und Verwalten von Snowflake-Objekten, die vom Kafka-Konnektor verwendet werden, erfordert eine Rolle mit den folgenden minimalen Berechtigungen:

Objekt

Berechtigung

Anmerkungen

Datenbank

USAGE

Schema

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE Objekt widerrufen werden.

Tabelle

INSERT . SELECT

Nur erforderlich, wenn der Kafka-Konnektor zum Einlesen von Daten in eine vorhandene Tabelle verwendet wird. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kafka-Thema erstellt, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Tabelleneigentümer (d. h. er verfügt über die Berechtigung OWNERSHIP für die Tabelle).

Stagingbereich

READ . WRITE

Nur erforderlich, wenn der Kafka-Konnektor zum Bereitstellen von Datendateien von Kafka in einem vorhandenen internen Stagingbereich verwendet wird (nicht empfohlen). . Wenn der Konnektor einen neuen Stagingbereich zum temporären Speichern von Datendateien erstellt, die vom Kafka-Thema verwendet werden, wird die in der Kafka-Konfigurationsdatei angegebene Standardrolle des Benutzers zum Eigentümer des Stagingbereichs (d. h. er hat die Berechtigung OWNERSHIP für den Stagingbereich).

Snowflake empfiehlt, für jede Kafka-Instanz einen eigenen Benutzer (mit CREATE USER) und eine eigene Rolle (mit CREATE ROLE) zu erstellen, damit die Zugriffsrechte bei Bedarf einzeln widerrufen werden können. Die Rolle sollte dem Benutzer als Standardrolle zugewiesen werden.

Erstellen einer Rolle zur Verwendung des Kafka-Konnektors

Das folgende Skript erstellt eine benutzerdefinierte Rolle zur Verwendung durch den Kafka-Konnektor (z. B. KAFKA_CONNECTOR_ROLE_1). Jede Rolle, die Berechtigungen erteilen kann (z. B. SECURITYADMIN oder eine Rolle mit der Berechtigung MANAGE GRANTS), kann jedem Benutzer diese benutzerdefinierte Rolle zuweisen, damit der Kafka-Konnektor die erforderlichen Snowflake-Objekte erstellen und Daten in Tabellen einfügen kann. Das Skript referenziert eine bestimmte vorhandene Datenbank, ein bestimmtes Schema (kafka_db.kafka_schema) und einen bestimmten Benutzer (kafka_connector_user_1):

-- Use a role that can create and manage roles and privileges:
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database:
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema:
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table:
GRANT SELECT, INSERT ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended):
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user:
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user:
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;

Weitere Informationen zum Erstellen von benutzerdefinierten Rollen und Rollenhierarchien finden Sie unter Konfigurieren der Zugriffssteuerung.

Installationsvoraussetzungen

  • Der Kafka-Konnektor unterstützt die folgenden Paketversionen:

    Paket

    Version des Snowflake-Kafka-Konnektors

    Paketunterstützung

    Apache Kafka

    0.5.3 – 1.2.2

    Apache Kafka 2.0.x – 2.4.x

    1.2.3 (oder höher)

    Apache Kafka 2.0.x – 2.5.x

    Confluent

    0.5.3 – 1.2.2

    Confluent 5.0.x – 5.4.x

    1.2.3 (oder höher)

    Confluent 5.0.x – 5.5.x

  • Der Kafka-Konnektor ist für die Verwendung mit der Kafka Connect-API 2.0.0 ausgelegt. Wir empfehlen dringend, eine Kafka Connect-API-Version zwischen 2.0.0 und 2.3.0 zu verwenden. Ältere Versionen sind mit dem Konnektor nicht kompatibel, und neuere Versionen wurden nicht getestet.

  • Wenn Sie das Avro-Format zum Erfassen von Daten verwenden, gilt Folgendes:

  • Apache Kafka muss mit der gewünschten Datenaufbewahrungsdauer und/oder dem gewünschten Speicherlimit konfiguriert werden.

  • Installieren und konfigurieren Sie den Kafka Connect-Cluster.

    Jeder Kafka Connect-Clusterknoten muss über ausreichend RAM für den Snowflake-Konnektor für Apache Kafka verfügen. Das empfohlene Minimum beträgt 5 MB pro Kafka-Partition. (Dies ist zusätzlich zu dem RAM erforderlich, der für alle anderen Aufgaben von Kafka Connect erforderlich ist.)

  • Es wird dringend empfohlen, Ihre Kafka Connect-Instanz in derselben Cloudanbieter-Region wie Ihr Snowflake-Konto auszuführen. Dies ist nicht zwingend erforderlich, erhöht jedoch in der Regel den Durchsatz.

Eine Liste der von Snowflake-Clients unterstützten Betriebssysteme finden Sie unter Betriebssystemunterstützung.

Installieren des Konnektors

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Confluent.

Installieren des Konnektors für Confluent

Kafka-Konnektordateien herunterladen

Laden Sie die JAR-Datei des Kafka-Konnektors von einem der folgenden Speicherorte herunter:

Confluent Hub

https://www.confluent.io/hub/

Das Paket enthält alle Abhängigkeiten, die zur Verwendung eines verschlüsselten oder unverschlüsselten privaten Schlüssels für die Schlüsselpaarauthentifizierung erforderlich sind. Anweisungen hierzu finden Sie unter Verwenden der Schlüsselpaar-Authentifizierung (unter diesem Thema).

Maven Central Repository

https://mvnrepository.com/artifact/com.snowflake

Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaarauthentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:

Laden Sie diese Dateien in denselben lokalen Ordner wie die JAR-Datei des Kafka-Konnektors herunter.

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Installieren Sie den Kafka-Konnektor gemäß den Anweisungen, die für die Installation anderer Konnektoren bereitgestellt wurden:

Installieren des Konnektors für Open Source Apache Kafka

Dieser Abschnitt enthält Anweisungen zum Installieren und Konfigurieren des Kafka-Konnektors für Open Source Apache Kafka.

Apache Kafka installieren

  1. Laden Sie das Kafka-Paket von der offiziellen Website herunter: https://kafka.apache.org/downloads

  2. Wechseln Sie in einem Terminalfenster in das Verzeichnis, in das Sie die Paketdatei heruntergeladen haben.

  3. Führen Sie den folgenden Befehl aus, um die Datei kafka_<Scala-Version>-<Kafka-Version>.tgz zu dekomprimieren:

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

JDK installieren

Installieren und konfigurieren Sie das Java Development Kit (JDK). Snowflake wurde mit der Standard Edition (SE) des JDK getestet. Die Enterprise Edition (EE) ist voraussichtlich kompatibel, wurde jedoch nicht getestet.

Wenn Sie diesen Schritt bereits ausgeführt haben, können Sie den Abschnitt überspringen.

  1. Laden Sie das JDK von https://www.oracle.com/technetwork/java/javase/downloads/index.html herunter.

  2. Installieren oder dekomprimieren Sie das JDK.

  3. Stellen Sie gemäß den Anweisungen für Ihr Betriebssystem die Umgebungsvariable JAVA_HOME so ein, dass sie auf das Verzeichnis mit dem JDK verweist.

JAR-Dateien des Kafka-Konnektors herunterladen

  1. Laden Sie die JAR-Datei des Kafka-Konnektors vom Maven Central Repository herunter:

    https://mvnrepository.com/artifact/com.snowflake

  2. Für die JAR-Datei sind keine zusätzlichen Abhängigkeiten erforderlich, um einen unverschlüsselten privaten Schlüssel für die Schlüsselpaarauthentifizierung zu verwenden. Um einen verschlüsselten privaten Schlüssel zu verwenden, laden Sie die Kryptografiebibliothek Bouncy Castle (JAR-Datei) herunter. Snowflake verwendet Bouncy Castle zum Entschlüsseln von verschlüsselten privaten RSA-Schlüsseln, die zum Anmelden verwendet werden:

  3. Wenn Ihre Kafka-Daten im Apache Avro-Format gestreamt werden, laden Sie die Avro-JAR-Datei herunter:

    https://mvnrepository.com/artifact/org.apache.avro/avro

Der Quellcode des Konnektors ist unter https://github.com/snowflakedb/snowflake-kafka-connector verfügbar.

Kafka-Konnektor installieren

Kopieren Sie die JAR-Dateien, die Sie unter JAR-Dateien des Kafka-Konnektors herunterladen heruntergeladen haben, in den Ordner <Kafka-Verzeichnis>/libs.

Konfigurieren des Kafka-Konnektors

Der Konnektor wird konfiguriert, indem eine Datei erstellt wird, die Parameter wie die Snowflake-Anmeldeinformationen, Themennamen, Namen von Snowflake-Tabellen usw. angibt.

Wichtig

Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom Masterknoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Anweisungen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.

Jede Konfigurationsdatei legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.

Dieser Abschnitt enthält Anweisungen für den verteilten und eigenständigen Modus.

Beschreibungen der Konfigurationsfelder finden Sie unter Kafka-Konfigurationseigenschaften.

Wichtig

Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.

Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern. Weitere Informationen dazu finden Sie unter Externalisieren von Geheimnissen (unter diesem Thema).

Verteilter Modus

Erstellen Sie eine Kafka-Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/connect-distributed.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.

Beispielhafte Konfigurationsdatei

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myaccount.us-west-2.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}

Eigenständiger Modus

Erstellen Sie eine Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/connect-standalone.properties. Füllen Sie die Datei mit allen Informationen zur Konnektor-Konfiguration.

Beispielhafte Konfigurationsdatei

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myaccount.us-west-2.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword

Kafka-Konfigurationseigenschaften

Die folgenden Eigenschaften können in der Kafka-Konfigurationsdatei festgelegt werden:

Erforderliche Eigenschaften

name

Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name „name“ muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter topic2table.map (unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

Bemerkung

Entweder topics oder topics.regex ist erforderlich, nicht beide.

topics.regex

Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder topics oder topics.regex enthalten, nicht beides.

snowflake.url.name

Die URL für den Zugriff auf Ihr Snowflake-Konto in der Form https://<Kontoname>.<Regions-ID>.snowflakecomputing.com:443. Beachten Sie, dass https:// und die Portnummer optional sind. Die Regions-ID wird nicht verwendet, wenn sich Ihr Konto in der Region AWS US West befindet und Sie nicht AWS PrivateLink verwenden. Weitere Informationen zu Snowflake-Kontonamen und -Regionsnamen finden Sie unter Unterstützte Regionen.

snowflake.user.name

Anmeldename des Benutzers für das Snowflake-Konto.

snowflake.private.key

Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwert snowflake.private.key verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Verwenden der Schlüsselpaar-Authentifizierung (unter diesem Thema) verschlüsselt wurden.

Bemerkung

Siehe auch snowflake.private.key.passphrase unter Optionale Eigenschaften (unter diesem Thema).

snowflake.database.name

Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.

snowflake.schema.name

Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.

header.converter

Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist "org.apache.kafka.connect.storage.StringConverter".

key.converter

Schlüsselkonverter des Kafka-Datensatzes (z. B. "org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

value.converter

Wenn die Datensätze in JSON formatiert sind, sollte dies "com.snowflake.kafka.connector.records.SnowflakeJsonConverter" lauten.

Wenn die Datensätze in Avro formatiert sind und den Schema-Registrierungsdienst von Kafka verwenden, sollte dies "com.snowflake.kafka.connector.records.SnowflakeAvroConverter" lauten.

Wenn die Datensätze in Avro formatiert sind und das Schema enthalten (und daher nicht den Schema-Registrierungsdienst von Kafka benötigen), sollte dies "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry" lauten.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

Optionale Eigenschaften

snowflake.private.key.passphrase

Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.

tasks.max

Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Diese Zahl kann niedriger oder höher eingestellt werden. Snowflake empfiehlt jedoch keine höhere Einstellung.

snowflake.topic2table.map

Mit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

buffer.count.records

Anzahl der Datensätze, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake erfasst werden. Der Standardwert ist 10000 Datensätze.

buffer.flush.time

Anzahl der Sekunden zwischen Pufferleerungen, wobei die Leerung vom Kafka-Speichercache zum internen Stagingbereich erfolgt. Der Standardwert ist 120 Sekunden.

buffer.size.bytes

Kumulative Größe der Datensätze in Byte, die pro Kafka-Partition im Arbeitsspeicher zwischengespeichert werden, bevor sie in Snowflake als Datendateien erfasst werden. Der Standardwert hierfür ist 5000000 (5 MB).

Die Datensätze werden beim Schreiben in die Datendateien komprimiert. Infolgedessen kann die Größe der Datensätze im Puffer größer sein als die Größe der aus den Datensätzen erstellten Datendateien.

value.converter.schema.registry.url

Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.

value.converter.break.on.schema.registry.error

Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist false. Setzen Sie den Wert auf true, um dieses Verhalten zu aktivieren.

Unterstützt von Kafka-Konnektor, Version 1.4.2 (und höher).

jvm.proxy.host

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.

jvm.proxy.port

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.

jvm.proxy.username

Benutzername, der sich beim Proxyserver authentifiziert.

Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).

jvm.proxy.password

Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.

Unterstützt von Kafka-Konnektor, Version 1.4.4 (und höher).

value.converter.basic.auth.credentials.source

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter value.converter.basic.auth.user.info an. Andernfalls lassen Sie diesen Parameter aus.

value.converter.basic.auth.user.info

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.

snowflake.metadata.createtime

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert CreateTime in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.topic

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert topic in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.offset.and.partition

Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte Offset und Partition in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

snowflake.metadata.all

Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.

Unterstützt vom Kafka-Konnektor 1.2.0 (und höher).

Verwenden der Schlüsselpaar-Authentifizierung

Der Kafka-Konnektor basiert auf der Schlüsselpaar-Authentifizierung und nicht auf der typischen Authentifizierung mit Benutzername und Kennwort. Dieses Authentifizierungsverfahren erfordert ein 2048-Bit-RSA-Schlüsselpaar (Minimum). Generieren Sie das Public/Private-Schlüsselpaar mit OpenSSL. Der öffentliche Schlüssel wird dem in der Konfigurationsdatei definierten Snowflake-Benutzer zugewiesen.

So konfigurieren Sie das Public/Private-Schlüsselpaar:

  1. Generieren Sie über die Befehlszeile in einem Terminalfenster einen privaten Schlüssel:

    Sie können entweder eine verschlüsselte Version des privaten Schlüssels oder eine unverschlüsselte Version des privaten Schlüssels generieren.

    Bemerkung

    Der Kafka-Konnektor unterstützt Verschlüsselungsalgorithmen, die validiert wurden, um die Anforderungen des Federal Information Processing Standard (140-2) (FIPS 140-2) zu erfüllen. Weitere Informationen zu FIPS 140-2 finden Sie unter https://csrc.nist.gov/publications/detail/fips/140/2/final.

    Um eine unverschlüsselte Version zu generieren, verwenden Sie den folgenden Befehl:

    $ openssl genrsa -out rsa_key.pem 2048
    

    Verwenden Sie zum Generieren einer verschlüsselten Version den folgenden Befehl:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    Wobei <Algorithmus> ein mit FIPS 140-2 kompatibler Verschlüsselungsalgorithmus ist.

    So geben Sie beispielsweise AES 256 als Verschlüsselungsalgorithmus an:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    Wenn Sie eine verschlüsselte Version des privaten Schlüssels generieren, notieren Sie sich die Passphrase. Geben Sie die Passphrase später in der Kafka-Konfigurationsdatei für die Eigenschaft snowflake.private.key.passphrase an.

    Beispiel für einen privaten PEM-Schlüssel

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Generieren Sie über die Befehlszeile den öffentlichen Schlüssel, indem Sie auf den privaten Schlüssel verweisen:

    Angenommen, der private Schlüssel befindet sich in der Datei rsa_key.p8, dann verwenden Sie den folgenden Befehl:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Beispiel für einen öffentlichen PEM-Schlüssel

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Kopieren Sie die Dateien der öffentlichen und privaten Schlüssel zur Speicherung in ein lokales Verzeichnis. Zeichnen Sie den Pfad zu den Dateien auf. Beachten Sie, dass der private Schlüssel im Format PKCS#8 (Public Key Cryptography Standards) gespeichert und mit der im vorherigen Schritt angegebenen Passphrase verschlüsselt wird; die Datei sollte jedoch weiterhin mit dem von Ihrem Betriebssystem bereitgestellten Dateiberechtigungsmechanismus vor unbefugtem Zugriff geschützt sein. Es liegt in Ihrer Verantwortung, die Datei bei Nichtverwendung entsprechend zu sichern.

  4. Melden Sie sich bei Snowflake an. Weisen Sie dem Snowflake-Benutzer den öffentlichen Schlüssel mit ALTER USER zu. Beispiel:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Bemerkung

    • Nur Sicherheitsadministratoren (d. h. Benutzer mit der Rolle SECURITYADMIN oder höher) können andere Benutzer ändern.

    • Schließen Sie die Kopf- und Fußzeile des öffentlichen Schlüssels in der SQL-Anweisung aus.

    Überprüfen Sie den Fingerabdruck des öffentlichen Schlüssels des Benutzers mithilfe von DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Bemerkung

    Eine Beschreibung der Eigenschaft RSA_PUBLIC_KEY_2_FP finden Sie unter Schlüsselrotation (unter diesem Thema).

  5. Kopieren Sie den gesamten privaten Schlüssel, und fügen Sie ihn in das Feld snowflake.private.key der Konfigurationsdatei ein. Speichern Sie die Datei.

Externalisieren von Geheimnissen

Snowflake empfiehlt dringend, Geheimnisse wie den privaten Schlüssel zu externalisieren und in verschlüsselter Form bzw. in einem Schlüsselverwaltungsdienst wie AWS KMS, Azure Key Vault oder HashiCorp Vault zu speichern. Dies kann mithilfe einer ConfigProvider-Implementierung in Ihrem Kafka Connect-Cluster erreicht werden.

Weitere Informationen dazu finden Sie in der Beschreibung des Confluent-Dienstes: https://docs.confluent.io/current/connect/security.html#externalizing-secrets.

Schlüsselrotation

Snowflake unterstützt mehrere aktive Schlüssel, um eine ununterbrochene Rotation zu ermöglichen. Rotieren und ersetzen Sie Ihre öffentlichen und privaten Schlüssel basierend auf dem Ablaufzeitplan, den Sie intern einhalten.

Derzeit können Sie die Parameter RSA_PUBLIC_KEY und RSA_PUBLIC_KEY_2 für ALTER USER verwenden, um einem einzelnen Benutzer bis zu zwei öffentliche Schlüssel zuzuordnen.

So rotieren Sie Ihre Schlüssel:

  1. Führen Sie die unter Verwenden der Schlüsselpaar-Authentifizierung angegebenen Schritte aus, um Folgendes zu ermöglichen:

    • Generieren eines neuen privaten und öffentlichen Schlüsselsatzes

    • Zuweisen des öffentlichen Schlüssels an einen Benutzer Setzen Sie den Wert des öffentlichen Schlüssels entweder auf RSA_PUBLIC_KEY oder RSA_PUBLIC_KEY_2 (je nachdem, welcher Schlüsselwert gerade nicht verwendet wird). Beispiel:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Aktualisieren Sie den Code, um eine Verbindung zu Snowflake herzustellen. Geben Sie den neuen privaten Schlüssel an.

    Snowflake überprüft den korrekten aktiven öffentlichen Schlüssel für die Authentifizierung basierend auf dem privaten Schlüssel, der mit Ihren Verbindungsinformationen übermittelt wurde.

  3. Entfernen Sie den alten öffentlichen Schlüssel aus dem Benutzerprofil. Beispiel:

    alter user jsmith unset rsa_public_key;
    

Starten von Kafka

Starten Sie Kafka anhand der Anweisungen in der Confluent- oder Apache Kafka-Dokumentation des Drittanbieters.

Starten des Kafka-Konnektors

Sie können den Kafka-Konnektor entweder im verteilten oder im eigenständigen Modus starten. Anweisungen für den jeweiligen Modus finden Sie unten:

Verteilter Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

curl -X POST -H "Content-Type: application/json" --data @<config_file>.json http://localhost:8083/connectors

Eigenständiger Modus

Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus:

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/connect-standalone.properties

Testen und Verwenden des Kafka-Konnektors

Wir empfehlen, den Kafka-Konnektor mit einer kleinen Datenmenge zu testen, bevor Sie den Konnektor in einem Produktionssystem verwenden. Der Vorgang zum Testen ist der gleiche wie bei der normalen Verwendung des Konnektors:

  1. Stellen Sie sicher, dass Kafka und Kafka Connect ausgeführt werden.

  2. Stellen Sie sicher, dass Sie das entsprechende Kafka-Thema erstellt haben.

  3. Erstellen Sie einen Nachrichten-Publisher (oder nutzen Sie einen vorhandenen). Stellen Sie sicher, dass die zum Thema veröffentlichten Meldungen das korrekte Format haben (JSON oder Avro).

  4. Erstellen Sie eine Konfigurationsdatei, die das zu abonnierende Thema und die Snowflake-Tabelle angibt, in die geschrieben werden soll. Siehe auch den nächsten Schritt.

  5. (Optional) Erstellen Sie eine Tabelle, in die Daten geschrieben werden sollen. Dieser Schritt ist optional. Wenn Sie die Tabelle nicht erstellen, wird die Tabelle vom Kafka-Konnektor für Sie erstellt. Wenn Sie nicht vorhaben, den Konnektor zum Hinzufügen von Daten zu einer vorhandenen, nicht leeren Tabelle zu verwenden, empfiehlt Snowflake, dass Sie die Tabelle vom Konnektor erstellen lassen, um die Möglichkeit inkongruenter Schemas zu minimieren.

  6. Erteilen Sie der Rolle, die zum Erfassen von Daten verwendet wird, die für die Snowflake-Objekte (Datenbank, Schema, Zieltabelle usw.) erforderlichen minimalen Berechtigungen.

  7. Veröffentlichen Sie einen Beispieldatensatz im konfigurierten Kafka-Thema.

  8. Warten Sie einige Minuten, bis sich die Daten im System verteilt haben, und überprüfen Sie dann die Snowflake-Tabelle, um sicherzustellen, dass die Datensätze eingefügt wurden.

Tipp

Überlegen Sie, ob Sie Ihre Netzwerkverbindung zu Snowflake vorab mithilfe von SnowCD überprüfen, bevor Sie Daten in Ihre Test- und Produktionsumgebung in Snowflake laden.