Snowflake High Performance connector for Kafka: Kafka konfigurieren¶
Unter diesem Thema werden die Schritte zur Installation und Konfiguration von Kafka für Snowflake High Performance connector for Kafka beschrieben.
Installieren des Kafka-Konnektors¶
Der Kafka-Konnektor wird als JAR-Datei (ausführbare Java-Datei) bereitgestellt.
Snowflake bietet zwei Versionen des Konnektors:
Eine Version für die Confluent-Implementierung von Kafka Connect.
Eine Version für open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/.
Beide Versionen des Konnektors sind in der privaten Snowflake-Vorschau verfügbar und müssen über Snowflake bezogen werden. Wenden Sie sich an Ihr Snowflake-Kundenteam, um die JAR-Datei für den Konnektor zu erhalten.
Wenn Sie nicht sicher sind, welche Version Sie verwenden sollen, lesen Sie Auswählen einer Konnektorversion. Konfigurieren des Kafka-Konnektors ==============================================================================
Die Konfiguration des Konnektors ist anbieterspezifisch. Einige Implementierungen, wie Amazon MSK Connect, haben eine UI für die Konfiguration des Konnektors und akzeptieren die Konfiguration sowohl im JSON- als auch Eigenschaften-Dateiformat.
Dieser Abschnitt ist eine allgemeine Referenz für die Namen und Werte der Konnektorparameter. Beachten Sie, dass verschiedene Cloudanbieter möglicherweise leicht unterschiedliche Konfigurationsanforderungen haben.
Wichtig
Das Kafka Connect-Framework überträgt die Konfigurationseinstellungen für den Kafka-Konnektor vom Masterknoten zu den Workerknoten. Die Konfigurationseinstellungen enthalten vertrauliche Informationen (insbesondere den Snowflake-Benutzernamen und den privaten Schlüssel). Stellen Sie sicher, dass der Kommunikationskanal zwischen den Kafka Connect-Knoten sicher ist. Anweisungen dazu finden Sie in der Dokumentation zu Ihrer Apache Kafka-Software.
Jede Konfigurationsdatei legt die Themen und entsprechenden Tabellen für genau eine Datenbank und genau ein Schema in dieser Datenbank fest. Beachten Sie, dass ein Konnektor Nachrichten aus einer beliebigen Anzahl von Themen erfassen kann, sich die entsprechenden Tabellen jedoch alle in einer einzigen Datenbank und in einem einzigen Schema befinden müssen.
Beschreibungen der Konfigurationsfelder finden Sie unter Konnektor-Konfigurationseigenschaften.
Wichtig
Da die Konfigurationsdatei normalerweise sicherheitsrelevante Informationen enthält, z. B. den privaten Schlüssel, müssen Sie die Lese/Schreib-Berechtigungen für die Datei entsprechend festlegen, um den Zugriff zu beschränken.
Ziehen Sie außerdem in Betracht, die Konfigurationsdatei an einem sicheren externen Speicherort oder in einem Schlüsselverwaltungsdienst zu speichern.
Beispiel für eine JSON-Konfigurationsdatei
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
Beispiel für Datei mit Konfigurationseigenschaften
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Konnektor-Konfigurationseigenschaften¶
Erforderliche Eigenschaften¶
nameAnwendungsname. Dieser muss für alle vom Kunden verwendeten Kafka-Konnektoren eindeutig sein. Der Name muss aus einem gültigen, nicht in Anführungszeichen gesetzten Snowflake-Bezeichner bestehen. Informationen zu gültigen Bezeichnern finden Sie unter Anforderungen an Bezeichner.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsKommagetrennte Themenliste. Standardmäßig geht Snowflake davon aus, dass der Tabellenname mit dem Themennamen identisch ist. Wenn der Tabellenname nicht mit dem Themennamen identisch ist, verwenden Sie den optionalen Parameter
topic2table.map(unten), um die Zuordnung von Themenname zu Tabellenname anzugeben. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner.Bemerkung
Entweder
topicsodertopics.regexist erforderlich, nicht beide.topics.regexDies ist ein regulärer Ausdruck („Regex“), der die Themen angibt, die die Nachrichten enthalten, die in Snowflake-Tabellen geladen werden sollen. Der Konnektor lädt Daten aus einem beliebigen Themennamen, der mit dem regulären Ausdruck übereinstimmt. Der reguläre Ausdruck muss den Regeln für reguläre Java-Ausdrücke entsprechen (d. h. muss mit java.util.regex.Pattern kompatibel sein). Die Konfigurationsdatei sollte entweder
topicsodertopics.regexenthalten, nicht beides.snowflake.url.nameDie URL für den Zugriff auf Ihr Snowflake-Konto. Diese URL muss Ihren Kontobezeichner enthalten. Beachten Sie, dass das Protokoll (
https://) und die Portnummer optional sind.snowflake.user.nameAnmeldename des Benutzers für das Snowflake-Konto.
snowflake.role.nameDer Name der Rolle, mit der der Konnektor Daten in die Tabelle einfügt.
snowflake.private.keyDer private Schlüssel zur Authentifizierung des Snowflake-Benutzers. Fügen Sie nur den Schlüssel hinzu, nicht Kopf- oder Fußzeile. Wenn der Schlüssel auf mehrere Zeilen aufgeteilt ist, entfernen Sie die Zeilenumbrüche. Sie können entweder einen unverschlüsselten Schlüssel bereitstellen oder einen verschlüsselten Schlüssel zusammen mit dem Parameter
snowflake.private.key.passphrase, damit Snowflake den Schlüssel entschlüsseln kann. Verwenden Sie diesen Parameter nur und nur dann, wenn der Parameterwertsnowflake.private.keyverschlüsselt ist. Hiermit werden private Schlüssel entschlüsselt, die gemäß den Anweisungen in Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation verschlüsselt wurden.Bemerkung
Siehe auch
snowflake.private.key.passphrasein Optionale Eigenschaften.snowflake.database.nameDer Name der Datenbank, die die Tabelle enthält, in die Zeilen eingefügt werden sollen.
snowflake.schema.nameDer Name des Schemas, das die Tabelle enthält, in die Zeilen eingefügt werden sollen.
header.converterNur erforderlich, wenn die Datensätze in Avro formatiert sind und einen Header enthalten. Der Wert ist
"org.apache.kafka.connect.storage.StringConverter".key.converterSchlüsselkonverter des Kafka-Datensatzes (z. B.
"org.apache.kafka.connect.storage.StringConverter"). Dieser wird vom Kafka-Konnektor nicht verwendet, aber von der Kafka Connect-Plattform benötigt.Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
value.converterDer Konnektor unterstützt die Standard-Konverter der Kafka-Community. Wählen Sie den passenden Konverter auf der Grundlage Ihres Datenformats aus:
Für JSON-Datensätze:
"org.apache.kafka.connect.json.JsonConverter"Für Avro-Datensätze mit Schema-Registry:
"io.confluent.connect.avro.AvroConverter"
Aktuelle Einschränkungen finden Sie unter Einschränkungen für Kafka-Konnektor.
Optionale Eigenschaften¶
snowflake.private.key.passphraseWenn der Wert dieses Parameters nicht leer ist, verwendet Kafka diesen Ausdruck, um zu versuchen, den privaten Schlüssel zu entschlüsseln.
tasks.maxAnzahl der Aufgaben. Entspricht in der Regel der Zahl der CPU-Kerne, die auf die Workerknoten im Kafka Connect-Cluster verteilt sind. Um eine optimale Leistung zu erzielen, empfiehlt Snowflake, die Anzahl der Tasks auf die Gesamtzahl der Kafka-Partitionen festzulegen, jedoch nicht die Anzahl der CPU-Kerne zu überschreiten. Eine hohe Anzahl von Tasks kann zu einem erhöhten Arbeitsspeicherverbrauch und häufigen Anpassungen führen.
snowflake.topic2table.mapMit diesem optionalen Parameter können Benutzer angeben, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Informationen zu gültigen Tabellennamen finden Sie unter Anforderungen an Bezeichner. Die Konfiguration von Themen erlaubt die Verwendung regulärer Ausdrücke zur Definition von Themen, genau wie die Verwendung von
topics.regex. Die regulären Ausdrücke dürfen nicht mehrdeutig sein — jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen.Beispiel:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
könnte geschrieben werden als:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.urlWenn Sie das Avro-Format und einen Schema-Registrierungsdienst verwenden, sollte dies die URL des Schema-Registrierungsdienstes sein. Andernfalls sollte das Feld leer sein.
value.converter.break.on.schema.registry.errorWenn Sie Avro-Daten aus dem Schema-Registrierungsdienst laden, bestimmt diese Eigenschaft, ob der Kafka-Konnektor keine Datensätze mehr verbrauchen soll, wenn beim Abrufen der Schema-ID ein Fehler auftritt. Der Standardwert ist
false. Setzen Sie den Wert auftrue, um dieses Verhalten zu aktivieren.jvm.proxy.hostDamit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Host dieses Proxyservers anzugeben.
jvm.proxy.portDamit der Snowflake-Kafka-Konnektor über einen Proxyserver auf Snowflake zugreifen kann, legen Sie diesen Parameter fest, um den Port dieses Proxyservers anzugeben.
snowflake.streaming.max.client.lagLegt fest, wie oft Snowflake Ingest Java die Daten an Snowflake entleert, in Sekunden.
- Werte:
Minimum:
1SekundeMaximum:
600Sekunden
- Standard:
1Sekunde
jvm.proxy.usernameBenutzername, der sich beim Proxyserver authentifiziert.
jvm.proxy.passwordKennwort für den Benutzernamen, der sich beim Proxyserver authentifiziert.
snowflake.jdbc.mapBeispiel:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Weitere JDBC-Eigenschaften (siehe Übersicht der Verbindungsparameter für den JDBC-Treiber) werden nicht validiert. Diese zusätzlichen Eigenschaften werden nicht validiert und dürfen weder überschrieben noch anstelle der erforderlichen Eigenschaften verwendet werden, wie z. B:
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameusw.- Geben Sie eine der folgenden Kombinationen an:
tracing-Eigenschaft zusammen mit der VariableJDBC_TRACEenvdatabase-Eigenschaft zusammen mitsnowflake.database.name
Führt zu einem mehrdeutigen Verhalten und das Verhalten wird durch den JDBC-Treiber bestimmt.
value.converter.basic.auth.credentials.sourceWenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „USER_INFO“, und geben Sie einen Wert für den unten beschriebenen Parameter
value.converter.basic.auth.user.infoan. Andernfalls lassen Sie diesen Parameter aus.value.converter.basic.auth.user.infoWenn Sie das Avro-Datenformat verwenden und einen sicheren Zugriff auf die Kafka-Schemaregistrierung benötigen, setzen Sie diesen Parameter auf die Zeichenfolge „<Benutzer-ID>:<Kennwort>“, und geben Sie einen Wert für den oben beschriebenen Parameter value.converter.basic.auth.credentials.source an. Andernfalls lassen Sie diesen Parameter aus.
snowflake.metadata.createtimeWenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
CreateTimein den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.topicWenn der Wert auf FALSE festgelegt ist, wird der Eigenschaftswert
topicin den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.offset.and.partitionWenn der Wert auf FALSE festgelegt ist, werden die Eigenschaftswerte
OffsetundPartitionin den Metadaten in der Spalte RECORD_METADATA weggelassen. Der Standardwert ist TRUE.snowflake.metadata.allWenn der Wert auf FALSE gesetzt ist, sind die Metadaten in der Spalte RECORD_METADATA vollständig leer. Der Standardwert ist TRUE.
transformsGeben Sie an, dass vom Kafka-Konnektor festgestellte Tombstone-Datensätze übersprungen und nicht in die Zieltabelle geladen werden sollen. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist.
Setzen Sie den Eigenschaftswert auf
"tombstoneHandlerExample".Bemerkung
Verwenden Sie diese Eigenschaft nur für Kafka-Community-Konvertern, d. h. Eigenschaftswert
value.converter(z. B.org.apache.kafka.connect.json.JsonConverteroderorg.apache.kafka.connect.json.AvroConverter). Um die Behandlung von Tombstone-Datensätzen mit den Snowflake-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaftbehavior.on.null.values.transforms.tombstoneHandlerExample.typeErforderlich beim Einstellen der
transforms-Eigenschaft.Setzen Sie den Eigenschaftswert auf
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesGeben Sie an, wie der Kafka-Konnektor Tombstone-Datensätze behandeln soll. Ein Tombstone-Datensatz ist definiert als ein Datensatz, bei dem das gesamte Wertefeld null ist. Für Snowpipe wird diese Eigenschaft von der Kafka-Konnektor-Version 1.5.5 und höher unterstützt. Für Snowpipe Streaming wird diese Eigenschaft von der Kafka-Konnektor-Version 2.1.0 und höher unterstützt.
Diese Eigenschaft unterstützt die folgenden Werte:
DEFAULTWenn der Kafka-Konnektor auf einen Tombstone-Datensatz stößt, fügt er in die Inhaltsspalte eine leere JSON-Zeichenfolge ein.
IGNOREDer Kafka-Konnektor überspringt Tombstone-Datensätze und fügt keine Zeilen für diese Datensätze ein.
Der Standardwert ist
DEFAULT.Bemerkung
Die Erfassung von Tombstone-Datensätzen variiert je nach Erfassungsmethode:
Für Snowpipe verwendet der Kafka-Konnektor nur Snowflake-Konverter. Um die Behandlung von Tombstone-Datensätzen mit den Kafka-Community-Konvertern zu verwalten, verwenden Sie stattdessen die Eigenschaften
transformundtransforms.tombstoneHandlerExample.type.Für Snowpipe Streaming verwendet der Kafka-Konnektor ausschließlich Community-Konverter.
An Kafka-Broker gesendete Datensätze dürfen nicht NULL sein, da diese Datensätze vom Kafka-Konnektor verworfen werden, was zu fehlenden Offsets führt. Die fehlenden Offsets machen den Kafka-Konnektor in bestimmten Anwendungsfällen funktionsunfähig. Es wird empfohlen, dass Sie anstelle von NULL-Datensätzen Tombstone-Datensätze verwenden.