Einrichten von Openflow Connector für Kafka¶
Bemerkung
Der Konnektor unterliegt den Bedingungen für Konnektoren.
Voraussetzungen¶
Stellen Sie sicher, dass Sie Openflow Connector für Kafka gelesen haben.
Vergewissern Sie sich, dass Sie Openflow eingerichtet haben.
Konnektortypen¶
Der Openflow Connector für Kafka ist in drei verschiedenen Konfigurationen erhältlich, die jeweils für bestimmte Anwendungsfälle optimiert sind. Sie können diese Konnektordefinitionen aus der Konnektorgalerie herunterladen:
- Apache Kafka für das JSON-Datenformat
Vereinfachter Konnektor für die Aufnahme von JSON-Meldungen mit Schemaentwicklung und Zuordnung von Themen zu Tabellen
- Apache Kafka für das AVRO-Datenformat
Vereinfachter Konnektor für die Aufnahme von AVRO-Meldungen mit Schemaentwicklung und Zuordnung von Themen zu Tabellen
- Apache Kafka mit DLQ und Metadaten
Voller Funktionsumfang des Konnektors mit Unterstützung für Warteschlangen für nicht zustellbare Meldungen (DLQ), Metadatenbehandlung und Feature-Parität mit dem Snowflake-Konnektor für Kafka
Informationen zur detaillierte Konfiguration bestimmter Konnektortypen finden Sie unter:
Apache Kafka für JSON/AVRO-Datenformat – Konnektoren für die JSON-/AVRO-Datenformate
Apache Kafka mit DLQ und Metadaten – DLQ und Metadaten-Konnektor
Welchen Konnektor sollten Sie wählen?¶
Wählen Sie die Konnektorvariante aus, die Ihrem Datenformat, Ihren betrieblichen Anforderungen und Ihren Feature-Bedürfnissen am besten entspricht:
Wählen Sie Apache Kafka für das JSON- oder AVRO-Datenformat, wenn:
Ihre Kafka-Nachrichten sich im JSON- oder AVRO-Format befinden
Sie grundlegende Funktionen zur Schemaentwicklung benötigen
Ein einfaches Setup mit minimaler Konfiguration wollen
Keine erweiterte Fehlerbehandlung oder Warteschlangen für nicht zustellbare Meldungen benötigen
Eine neue Integration einrichten und schnell loslegen möchten
Formatspezifische Hinweise:
JSON-Format: Flexibler für unterschiedliche Datenstrukturen, einfacher zu debuggen und zu prüfen
AVRO-Format: Fest typisierte Daten mit eingebetteter Schema-Registry-Integration, besser für strukturierte Datenpipelines
Wählen Sie Apache Kafka mit DLQ und Metadaten, wenn:
Sie vom:doc:
Snowflake-Konnektor für Kafka </user-guide/kafka-connector-overview>
migrieren und Feature-Parität mit kompatiblen Funktionen benötigenSie eine robuste Fehlerbehandlung und Unterstützung für Warteschlangen für nicht zustellbare Meldungen für fehlgeschlagene Meldungen benötigen
Sie detaillierte Metadaten über die Aufnahme von Nachrichten (Zeitstempel, Offsets, Header) benötigen
Hinweise zur Migration¶
Wenn Sie derzeit den Snowflake-Konnektor für Kafka verwenden, wählen Sie die Option Apache Kafka mit DLQ und Metadaten für eine nahtlose Migration mit Feature-Kompatibilität.
Unterschiede bei der Behandlung von Feldnamen: Der Openflow-Konnektor für Kafka behandelt Sonderzeichen in Feldnamen anders als der Snowflake-Konnektor für Kafka. Nach der Migration kann der Openflow-Konnektor für Kafka aufgrund dieser Unterschiede in den Namenskonventionen neue Snowflake-Spalten mit unterschiedlichen Namen erstellen. Ausführliche Informationen darüber, wie Feldnamen umgewandelt werden, finden Sie unter Zuordnung von Feldnamen und Behandlung von Sonderzeichen.
Hinweise zur Performance¶
Konnektoren für das JSON- und AVRO-Format bieten aufgrund ihres optimierten Designs eine bessere Leistung für einfache Anwendungsfälle
Der Konnektor für DLQ und Metadaten bietet eine umfassendere Überwachung und Fehlerbehandlung zu Kosten einer etwas höheren Ressourcennutzung
Snowflake-Konto einrichten¶
Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:
Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.
Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.
Da der Konnektor die Möglichkeit hat, die Zieltabelle automatisch zu erstellen, wenn sie noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:
Objekt
Berechtigung
Anmerkungen
Datenbank
USAGE
Schema
USAGE . CREATE TABLE .
Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE
object
widerrufen werden.Tabelle
OWNERSHIP
Nur erforderlich, wenn Sie den Kafka-Konnektor zum Aufnehmen von Daten in eine vorhandene Tabelle verwenden. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kafka-Topic erstellt, wird die in der Konfiguration angegebene Standardrolle des Benutzers zum Eigentümer der Tabelle.
Snowflake empfiehlt, für jede Kafka-Instanz einen eigenen Benutzer und eine eigene Rolle zu erstellen, um die Zugriffssteuerung zu verbessern.
Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):
USE ROLE securityadmin; CREATE ROLE kafka_connector_role_1; GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1; GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
Beachten Sie, dass Berechtigungen direkt der Konnektorrolle zugewiesen werden müssen und nicht vererbt werden können.
Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.
Die Rolle sollte dem Benutzer als Standardrolle zugewiesen werden:
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1; ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Verwenden Sie die Schlüsselpaar-Authentifizierung für die Konfiguration des Snowflake-SERVICE-Benutzers aus Schritt 1.
Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.
Bemerkung
Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.
Nachem der Geheimnismanager konfiguriert ist, legen Sie fest, wie Sie sich bei ihm authentifizieren möchten. Auf AWS wird empfohlen, die mit Openflow verknüpfte EC2-Instanzrolle zu verwenden, da auf diese Weise keine weiteren Geheimnisse gespeichert werden müssen.
Konfigurieren Sie in Openflow einen mit diesem Geheimnismanager verknüpften Parameter Provider über das Hamburger-Menü oben rechts. Navigieren Sie zu Controller Settings » Parameter Provider, und rufen Sie Ihre Parameterwerte ab.
Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.
Wenn andere Snowflake-Benutzer Zugriff auf die vom Konnektor aufgenommenen Rohdokumente und -tabellen benötigen (z. B. für die benutzerdefinierte Verarbeitung in Snowflake), weisen Sie diesen Benutzern die in Schritt 1 erstellte Rolle zu.
Einrichten des Konnektors¶
Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:
Konnektor installieren¶
Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.
Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.
Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.
Wählen Sie Add aus.
Bemerkung
Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.
Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.
Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.
Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.
Konnektor konfigurieren¶
Befüllen Sie die Prozessgruppenparameter
Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameter aus.
Füllen Sie die erforderlichen Parameterwerte aus, wie unter Allgemeine Parameter beschrieben.
Allgemeine Parameter¶
Alle Kafka-Konnektorvarianten verfügen über gemeinsame Parameterkontexte für die grundlegende Konnektivität und Authentifizierung.
Snowflake-Zielsystemparameter¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Destination Database |
Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein |
Ja |
Destination Schema |
Das Schema, in dem die Daten als persistent gespeichert werden. Es muss bereits in Snowflake vorhanden sein. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden. Geben Sie diesen Parameter in Großbuchstaben an, es sei denn, das Schema wurde mit einem Namen in doppelten Anführungszeichen erstellt. Stellen Sie dann sicher, dass die Groß-/Kleinschreibung übereinstimmt, schließen Sie jedoch die einschließenden doppelten Anführungszeichen nicht ein. Sehen Sie sich die folgenden Beispiele an:
|
Ja |
Snowflake Account Identifier |
Snowflake-Kontoname im Format [organisation-name]-[account-name], in dem die Daten gespeichert werden |
Ja |
Snowflake Authentication Strategy |
Strategie zur Authentifizierung bei Snowflake. Mögliche Werte: SNOWFLAKE_SESSION_TOKEN – wenn wir den Ablauf auf SPCS ausführen, KEY_PAIR, wenn wir den Zugriff mit einem privaten Schlüssel einrichten wollen |
Ja |
Snowflake Private Key |
Der private RSA Schlüssel, der für die Authentifizierung verwendet wird. Der RSA-Schlüssel muss nach den PKCS8-Standards formatiert sein und den Standard-PEM-Header und -Footer enthalten. Beachten Sie, dass entweder Snowflake Private Key File oder Snowflake Private Key definiert sein muss |
Nein |
Snowflake Private Key File |
Die Datei, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird. Sie ist nach den PKCS8-Standards formatiert und hat die Standard-PEM-Header und -Footer. Die Header beginnt mit |
Nein |
Snowflake Private Key Password |
Das Kennwort, das mit der Snowflake Private Key-Datei verknüpft ist |
Nein |
Snowflake Role |
Snowflake-Rolle, die bei der Ausführung der Abfrage verwendet wird |
Ja |
Snowflake-Benutzername |
Benutzername für die Verbindung zur Snowflake-Instanz |
Ja |
Kafka-Quellsystemparameter (SASL-Authentifizierung)¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Kafka-Sicherheitsprotokoll |
Sicherheitsprotokoll, das für die Kommunikation mit Brokern verwendet wird. Entspricht der Eigenschaft „security.protocol“ des Kafka-Clients. Eine der folgenden Optionen: SASL_PLAINTEXT / SASL_SSL |
Ja |
Kafka SASL Mechanism |
SASL-Mechanismus, der für die Authentifizierung verwendet wird. Entspricht der Eigenschaft „sasl.mechanism“ des Kafka-Clients. Eine der folgenden Optionen PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 |
Ja |
Kafka SASL Username |
Der Benutzername für die Authentifizierung bei Kafka |
Ja |
Kafka SASL Password |
Das Kennwort für die Authentifizierung bei Kafka |
Ja |
Kafka Bootstrap Servers |
Eine durch Kommas getrennte Liste des Kafka-Broker, von dem Daten abgerufen werden sollen. Sie sollte den Port enthalten, zum Beispiel „kafka-broker:9092“. Dieselbe Instanz wird für das DLQ-Thema verwendet. |
Ja |
Aufnahmeparameter für Kafka¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Kafka Topic Format |
Eine der folgenden Optionen: Namen / Muster. Gibt an, ob es sich bei den angegebenen „Kafka Topics“ um eine durch Kommas getrennte Liste von Namen oder um einen einzelnen regulären Ausdruck handelt. |
Ja |
Kafka Topics |
Eine durch Kommas getrennte Liste von Kafka-Themen oder ein regulärer Ausdruck. |
Ja |
Kafka Group Id |
Die ID einer Verbrauchergruppe, die vom Konnektor verwendet wird. Kann beliebig gewählt werden, muss jedoch eindeutig sein. |
Ja |
Kafka Auto Offset Reset |
Automatische Offset-Konfiguration, wenn kein vorheriger Verbraucher-Offset gefunden wird, der der Kafka-Eigenschaft |
Ja |
Topic To Table Map |
Mit diesem optionalen Parameter kann der Benutzer festlegen, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Die regulären Ausdrücke dürfen nicht mehrdeutig sein – jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen. Wenn leer oder keine Treffer gefunden wurden, wird der Name des Themas als Tabellenname verwendet. Hinweis: Die Zuordnung darf keine Leerzeichen nach Kommas enthalten. |
Nein |
Topic To Table Map
Beispielwerte:
topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range
topic[0-4]:low_range,topic[5-9]:high_range
.*:destination_table
- maps all topics to the destination_table
Konfiguration variantenspezifischer Einstellungen¶
Nachdem Sie die allgemeinen Parameter konfiguriert haben, müssen Sie die spezifischen Einstellungen für Ihre gewählte Konnektorvariante konfigurieren:
- Für die Konnektoren Apache Kafka für JSON-Datenformat und Apache Kafka für AVRO-Datenformat:
Siehe Apache Kafka für JSON/AVRO-Datenformat für JSON-/AVRO-spezifische Parameter.
- Für Apache Kafka mit DLQ und Metadaten-Konnektor:
Siehe Apache Kafka mit DLQ und Metadaten für erweiterte Parameter, einschließlich der DLQ-Konfiguration, Schematisierungseinstellungen, Unterstützung von Iceberg-Tabellen und Optionen für das Meldungsformat.
Authentifizierung¶
Alle Konnektorvarianten unterstützen die SASL-Authentifizierung, die über Parameterkontexte konfiguriert wird, wie in Kafka-Quellparameter (SASL-Authentifizierung) beschrieben.
Für andere Authentifizierungsmethoden, einschließlich mTLS und AWS MSK IAM, siehe Andere Authentifizierungsmethoden für Openflow Connector für Kafka konfigurieren.
Führen Sie den Ablauf aus¶
Klicken Sie mit der rechten Maustaste auf die Ebene, und klicken Sie dann auf Enable all Controller Services.
Klicken Sie mit der rechten Maustaste auf die Ebene und klicken Sie auf Start. Der Konnektor beginnt mit der Datenaufnahme.