Snowflake High Performance connector for Kafka: Kafka konfigurieren

Unter diesem Thema werden die Schritte zur Installation und Konfiguration von Kafka für Snowflake High Performance connector for Kafka beschrieben.

Installieren des Kafka-Konnektors

Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.

Snowflake bietet zwei Versionen des Konnektors:

Beide Versionen des Konnektors sind in der privaten Snowflake-Vorschau verfügbar und müssen über Snowflake bezogen werden. Wenden Sie sich an Ihr Snowflake-Kundenteam, um die JAR-Datei für den Konnektor zu erhalten.

Wenn Sie nicht sicher sind, welche Version Sie verwenden sollen, lesen Sie Auswählen einer Konnektorversion. Konfigurieren des Kafka-Konnektors ==============================================================================

Die Konfiguration des Konnektors ist anbieterspezifisch. Einige Implementierungen, wie Amazon MSK Connect, haben eine UI für die Konfiguration des Konnektors und akzeptieren die Konfiguration sowohl im JSON- als auch Eigenschaften-Dateiformat.

Dieser Abschnitt ist eine allgemeine Referenz für die Namen und Werte der Konnektorparameter. Beachten Sie, dass verschiedene Cloudanbieter möglicherweise leicht unterschiedliche Konfigurationsanforderungen haben.

Wichtig

Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom Masterknoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Anweisungen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.

Jede Konfigurationsdatei legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.

Beschreibungen der Konfigurationsfelder finden Sie unter Konnektor-Konfigurationseigenschaften.

Wichtig

Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.

Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern.

Beispiel für eine JSON-Konfigurationsdatei

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "all"
      }
}
Copy

Beispiel für Datei mit Konfigurationseigenschaften

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Copy

Konnektor-Konfigurationseigenschaften

Erforderliche Eigenschaften

name

Anwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

Kommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter topic2table.map (unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.

Bemerkung

Entweder topics oder topics.regex ist erforderlich, nicht beide.

topics.regex

Dies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder topics oder topics.regex enthalten, nicht beides.

snowflake.url.name

Die URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (https://) und die Portnummer optional sind.

snowflake.user.name

Anmeldename des Benutzers für das Snowflake-Konto.

snowflake.role.name

Der Name der Rolle, mit der der Konnektor Daten in die Tabelle einfügt.

snowflake.private.key

Der private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwert snowflake.private.key verschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation verschlüsselt wurden.

Bemerkung

Siehe auch snowflake.private.key.passphrase in Optionale Eigenschaften.

snowflake.database.name

Der Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.

snowflake.schema.name

Der Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.

header.converter

Nur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist "org.apache.kafka.connect.storage.StringConverter".

key.converter

Schlüsselkonverter des Kafka-Datensatzes (z. B. "org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

value.converter

Der Konnektor unterstützt die Standard-Konverter der Kafka-Community. Wählen Sie den passenden Konverter auf der Grundlage Ihres Datenformats aus:

  • Für JSON-Datensätze: "org.apache.kafka.connect.json.JsonConverter"

  • Für Avro-Datensätze mit Schema-Registry: "io.confluent.connect.avro.AvroConverter"

Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.

Optionale Eigenschaften

snowflake.private.key.passphrase

Wenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.

tasks.max

Anzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Um eine optimale Leistung zu erzielen, empfiehlt Snowflake, die Anzahl der Tasks auf die Gesamtzahl der Kafka-Partitionen festzulegen, jedoch nicht die Anzahl der CPU-Kerne zu überschreiten. Eine hohe Anzahl von Tasks kann zu einem erhöhten Arbeitsspeicherverbrauch und häufigen Anpassungen führen.

snowflake.topic2table.map

Mit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner. Die Konfiguration von Themen erlaubt die Verwendung regulärer Ausdrücke zur Definition von Themen, genau wie die Verwendung von topics.regex. Die regulären Ausdrücke dürfen nicht mehrdeutig sein — jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen.

Beispiel:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

könnte geschrieben werden als:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

Wenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.

value.converter.break.on.schema.registry.error

Wenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist false. Setzen Sie den Wert auf true, um dieses Verhalten zu aktivieren.

jvm.proxy.host

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.

jvm.proxy.port

Damit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.

snowflake.streaming.max.client.lag

Legt fest, wie oft Snowflake Ingest Java die Daten an Snowflake entleert, in Sekunden.

Werte:
  • Minimum: 1 Sekunde

  • Maximum: 600 Sekunden

Standard:

1 Sekunde

jvm.proxy.username

Benutzername, der sich beim Proxyserver authentifiziert.

jvm.proxy.password

Kennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.

snowflake.jdbc.map

Beispiel: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

Weitere JDBC-Eigenschaften (siehe Übersicht der Verbindungsparameter für den JDBC-Treiber) werden nicht validiert. Diese zusätzlichen Eigenschaften werden nicht validiert und dürfen weder überschrieben noch anstelle der erforderlichen Eigenschaften verwendet werden, wie z. B: jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name usw.

Geben Sie eine der folgenden Kombinationen an:
  • tracing-Eigenschaft zusammen mit der Variable JDBC_TRACE env

  • database-Eigenschaft zusammen mit snowflake.database.name

Führt zu einem mehrdeutigen Verhalten und das Verhalten wird durch den JDBC-Treiber bestimmt.

value.converter.basic.auth.credentials.source

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter value.converter.basic.auth.user.info an. Andernfalls lassen Sie diesen Parameter aus.

value.converter.basic.auth.user.info

Wenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.

snowflake.metadata.createtime

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert CreateTime in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.topic

Wenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert topic in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.offset.and.partition

Wenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte Offset und Partition in den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.

snowflake.metadata.all

Wenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.

transforms

Geben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.

Setzen Sie den Eigenschaftswert auf "tombstoneHandlerExample".

Bemerkung

Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert value.converter (z. B. org.apache.kafka.connect.json.JsonConverter oder org.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaft behavior.on.null.values.

transforms.tombstoneHandlerExample.type

Erforderlich beim Einstellen der transforms-Eigenschaft.

Setzen Sie den Eigenschaftswert auf "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Geben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.

Diese Eigenschaft unterstützt die folgenden Werte:

DEFAULT

Wenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.

IGNORE

Der Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.

Der Standardwert ist DEFAULT.

Bemerkung

Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:

  • Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften transform und transforms.tombstoneHandlerExample.type.

  • Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.

An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.

Nächste Schritte

Konnektor testen.