Einrichten von Openflow Connector für Kafka

Bemerkung

Der Konnektor unterliegt den Bedingungen für Konnektoren.

Voraussetzungen

  1. Stellen Sie sicher, dass Sie Openflow Connector für Kafka gelesen haben.

  2. Vergewissern Sie sich, dass Sie Openflow eingerichtet haben.

Konnektortypen

Der Openflow Connector für Kafka ist in drei verschiedenen Konfigurationen erhältlich, die jeweils für bestimmte Anwendungsfälle optimiert sind. Sie können diese Konnektordefinitionen aus der Konnektorgalerie herunterladen:

Apache Kafka für das JSON-Datenformat

Vereinfachter Konnektor für die Aufnahme von JSON-Meldungen mit Schemaentwicklung und Zuordnung von Themen zu Tabellen

Apache Kafka für das AVRO-Datenformat

Vereinfachter Konnektor für die Aufnahme von AVRO-Meldungen mit Schemaentwicklung und Zuordnung von Themen zu Tabellen

Apache Kafka mit DLQ und Metadaten

Voller Funktionsumfang des Konnektors mit Unterstützung für Warteschlangen für nicht zustellbare Meldungen (DLQ), Metadatenbehandlung und Feature-Parität mit dem Snowflake-Konnektor für Kafka

Informationen zur detaillierte Konfiguration bestimmter Konnektortypen finden Sie unter:

Welchen Konnektor sollten Sie wählen?

Wählen Sie die Konnektorvariante aus, die Ihrem Datenformat, Ihren betrieblichen Anforderungen und Ihren Feature-Bedürfnissen am besten entspricht:

Wählen Sie Apache Kafka für das JSON- oder AVRO-Datenformat, wenn:

  • Ihre Kafka-Nachrichten sich im JSON- oder AVRO-Format befinden

  • Sie grundlegende Funktionen zur Schemaentwicklung benötigen

  • Ein einfaches Setup mit minimaler Konfiguration wollen

  • Keine erweiterte Fehlerbehandlung oder Warteschlangen für nicht zustellbare Meldungen benötigen

  • Eine neue Integration einrichten und schnell loslegen möchten

Formatspezifische Hinweise:

  • JSON-Format: Flexibler für unterschiedliche Datenstrukturen, einfacher zu debuggen und zu prüfen

  • AVRO-Format: Fest typisierte Daten mit eingebetteter Schema-Registry-Integration, besser für strukturierte Datenpipelines

Wählen Sie Apache Kafka mit DLQ und Metadaten, wenn:

  • Sie vom:doc:Snowflake-Konnektor für Kafka </user-guide/kafka-connector-overview> migrieren und Feature-Parität mit kompatiblen Funktionen benötigen

  • Sie eine robuste Fehlerbehandlung und Unterstützung für Warteschlangen für nicht zustellbare Meldungen für fehlgeschlagene Meldungen benötigen

  • Sie detaillierte Metadaten über die Aufnahme von Nachrichten (Zeitstempel, Offsets, Header) benötigen

Hinweise zur Migration

Wenn Sie derzeit den Snowflake-Konnektor für Kafka verwenden, wählen Sie die Option Apache Kafka mit DLQ und Metadaten für eine nahtlose Migration mit Feature-Kompatibilität.

Unterschiede bei der Behandlung von Feldnamen: Der Openflow-Konnektor für Kafka behandelt Sonderzeichen in Feldnamen anders als der Snowflake-Konnektor für Kafka. Nach der Migration kann der Openflow-Konnektor für Kafka aufgrund dieser Unterschiede in den Namenskonventionen neue Snowflake-Spalten mit unterschiedlichen Namen erstellen. Ausführliche Informationen darüber, wie Feldnamen umgewandelt werden, finden Sie unter Zuordnung von Feldnamen und Behandlung von Sonderzeichen.

Hinweise zur Performance

  • Konnektoren für das JSON- und AVRO-Format bieten aufgrund ihres optimierten Designs eine bessere Leistung für einfache Anwendungsfälle

  • Der Konnektor für DLQ und Metadaten bietet eine umfassendere Überwachung und Fehlerbehandlung zu Kosten einer etwas höheren Ressourcennutzung

Snowflake-Konto einrichten

Als Snowflake-Kontoadministrator führen Sie die folgenden Aufgaben aus:

  1. Erstellen Sie einen neuen Benutzer für den Snowflake-Dienst mit dem Typ SERVICE.

  2. Erstellen Sie eine neue Rolle oder verwenden Sie eine vorhandene Rolle und erteilen Sie die Berechtigungen von Datenbanken.

    Da der Konnektor die Möglichkeit hat, die Zieltabelle automatisch zu erstellen, wenn sie noch nicht existiert, stellen Sie sicher, dass der Benutzer über die erforderlichen Berechtigungen zum Erstellen und Verwalten von Snowflake-Objekten verfügt:

    Objekt

    Berechtigung

    Anmerkungen

    Datenbank

    USAGE

    Schema

    USAGE . CREATE TABLE .

    Nachdem die Objekte auf Schemaebene erstellt wurden, können die Berechtigungen für CREATE object widerrufen werden.

    Tabelle

    OWNERSHIP

    Nur erforderlich, wenn Sie den Kafka-Konnektor zum Aufnehmen von Daten in eine vorhandene Tabelle verwenden. . Wenn der Konnektor eine neue Zieltabelle für Datensätze aus dem Kafka-Topic erstellt, wird die in der Konfiguration angegebene Standardrolle des Benutzers zum Eigentümer der Tabelle.

    Snowflake empfiehlt, für jede Kafka-Instanz einen eigenen Benutzer und eine eigene Rolle zu erstellen, um die Zugriffssteuerung zu verbessern.

    Sie können das folgende Skript verwenden, um eine benutzerdefinierte Rolle zu erstellen und zu konfigurieren (erfordert SECURITYADMIN oder gleichwertig):

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    Beachten Sie, dass Berechtigungen direkt der Konnektorrolle zugewiesen werden müssen und nicht vererbt werden können.

  3. Weisen Sie dem Benutzer des Snowflake-Dienstes die Rolle zu, die Sie in den vorherigen Schritten erstellt haben.

    Die Rolle sollte dem Benutzer als Standardrolle zugewiesen werden:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. Verwenden Sie die Schlüsselpaar-Authentifizierung für die Konfiguration des Snowflake-SERVICE-Benutzers aus Schritt 1.

  5. Snowflake empfiehlt diesen Schritt dringend. Konfigurieren Sie einen von Openflow unterstützten Geheimnismanager, z. B. AWS, Azure und Hashicorp, und speichern Sie die öffentlichen und privaten Schlüssel im Geheimnisspeicher.

    Bemerkung

    Wenn Sie aus irgendeinem Grund keinen Geheimnismanager verwenden möchten, sind Sie dafür verantwortlich, die für die Schlüsselpaar-Authentifizierung verwendeten öffentlichen und privaten Schlüsseldateien gemäß den Sicherheitsrichtlinien Ihrer Organisation zu schützen.

    1. Nachem der Geheimnismanager konfiguriert ist, legen Sie fest, wie Sie sich bei ihm authentifizieren möchten. Auf AWS wird empfohlen, die mit Openflow verknüpfte EC2-Instanzrolle zu verwenden, da auf diese Weise keine weiteren Geheimnisse gespeichert werden müssen.

    2. Konfigurieren Sie in Openflow einen mit diesem Geheimnismanager verknüpften Parameter Provider über das Hamburger-Menü oben rechts. Navigieren Sie zu Controller Settings » Parameter Provider, und rufen Sie Ihre Parameterwerte ab.

    3. Zu diesem Zeitpunkt können alle Anmeldeinformationen mit den zugehörigen Parameterpfaden referenziert werden, und es müssen keine sensiblen Werte innerhalb von Openflow aufbewahrt werden.

  6. Wenn andere Snowflake-Benutzer Zugriff auf die vom Konnektor aufgenommenen Rohdokumente und -tabellen benötigen (z. B. für die benutzerdefinierte Verarbeitung in Snowflake), weisen Sie diesen Benutzern die in Schritt 1 erstellte Rolle zu.

Einrichten des Konnektors

Als Data Engineer führen Sie die folgenden Aufgaben aus, um den Konnektor zu installieren und zu konfigurieren:

Konnektor installieren

  1. Navigieren Sie zur Openflow-Übersichtsseite. Wählen Sie im Abschnitt Featured connectors die Option View more connectors aus.

  2. Suchen Sie auf der Seite Openflow-Konnektoren den Konnektor und wählen Sie Add to runtime.

  3. Wählen Sie im Dialog Select runtime Ihre Laufzeit aus der Dropdown-Liste Available runtimes aus.

  4. Wählen Sie Add aus.

    Bemerkung

    Bevor Sie den Konnektor installieren, stellen Sie sicher, dass Sie in Snowflake eine Datenbank und ein Schema für den Konnektor erstellt haben, in dem die aufgenommenen Daten gespeichert werden.

  5. Authentifizieren Sie sich bei der Bereitstellung mit den Anmeldedaten Ihres Snowflake-Kontos und wählen Sie Allow, wenn Sie dazu aufgefordert werden, damit die Laufzeitanwendung auf Ihr Snowflake-Konto zugreifen kann. Die Installation des Konnektors nimmt einige Minuten in Anspruch.

  6. Authentifizieren Sie sich bei der Laufzeit mit den Anmeldeinformationen Ihres Snowflake-Kontos.

Das Openflow-Canvas wird mit der hinzugefügten Prozessgruppe des Konnektors angezeigt.

Konnektor konfigurieren

  1. Befüllen Sie die Prozessgruppenparameter

    1. Klicken Sie mit der rechten Maustaste auf die importierte Prozessgruppe und wählen Sie Parameter aus.

    2. Füllen Sie die erforderlichen Parameterwerte aus, wie unter Allgemeine Parameter beschrieben.

Allgemeine Parameter

Alle Kafka-Konnektorvarianten verfügen über gemeinsame Parameterkontexte für die grundlegende Konnektivität und Authentifizierung.

Snowflake-Zielsystemparameter

Parameter

Beschreibung

Erforderlich

Destination Database

Die Datenbank, in der die Daten persistiert werden. Sie muss bereits in Snowflake vorhanden sein

Ja

Destination Schema

Das Schema, in dem die Daten als persistent gespeichert werden. Es muss bereits in Snowflake vorhanden sein. Bei diesem Parameter wird zwischen Groß- und Kleinschreibung unterschieden. Geben Sie diesen Parameter in Großbuchstaben an, es sei denn, das Schema wurde mit einem Namen in doppelten Anführungszeichen erstellt. Stellen Sie dann sicher, dass die Groß-/Kleinschreibung übereinstimmt, schließen Sie jedoch die einschließenden doppelten Anführungszeichen nicht ein. Sehen Sie sich die folgenden Beispiele an:

  • CREATE SCHEMA SCHEMA_NAME oder CREATE SCHEMA schema_name – SCHEMA_NAME verwenden

  • CREATE SCHEMA "schema_name" oder CREATE SCHEMA "SCHEMA_NAME" – schema_name bzw. ``SCHEMA_NAME``verwenden

Ja

Snowflake Account Identifier

Snowflake-Kontoname im Format [organisation-name]-[account-name], in dem die Daten gespeichert werden

Ja

Snowflake Authentication Strategy

Strategie zur Authentifizierung bei Snowflake. Mögliche Werte: SNOWFLAKE_SESSION_TOKEN – wenn wir den Ablauf auf SPCS ausführen, KEY_PAIR, wenn wir den Zugriff mit einem privaten Schlüssel einrichten wollen

Ja

Snowflake Private Key

Der private RSA Schlüssel, der für die Authentifizierung verwendet wird. Der RSA-Schlüssel muss nach den PKCS8-Standards formatiert sein und den Standard-PEM-Header und -Footer enthalten. Beachten Sie, dass entweder Snowflake Private Key File oder Snowflake Private Key definiert sein muss

Nein

Snowflake Private Key File

Die Datei, die den privaten RSA-Schlüssel enthält, der für die Authentifizierung bei Snowflake verwendet wird. Sie ist nach den PKCS8-Standards formatiert und hat die Standard-PEM-Header und -Footer. Die Header beginnt mit -----BEGIN PRIVATE. Aktivieren Sie das Kontrollkästchen Reference asset, um die private Schlüsseldatei hochzuladen.

Nein

Snowflake Private Key Password

Das Kennwort, das mit der Snowflake Private Key-Datei verknüpft ist

Nein

Snowflake Role

Snowflake-Rolle, die bei der Ausführung der Abfrage verwendet wird

Ja

Snowflake-Benutzername

Benutzername für die Verbindung zur Snowflake-Instanz

Ja

Kafka-Quellsystemparameter (SASL-Authentifizierung)

Parameter

Beschreibung

Erforderlich

Kafka-Sicherheitsprotokoll

Sicherheitsprotokoll, das für die Kommunikation mit Brokern verwendet wird. Entspricht der Eigenschaft „security.protocol“ des Kafka-Clients. Eine der folgenden Optionen: SASL_PLAINTEXT / SASL_SSL

Ja

Kafka SASL Mechanism

SASL-Mechanismus, der für die Authentifizierung verwendet wird. Entspricht der Eigenschaft „sasl.mechanism“ des Kafka-Clients. Eine der folgenden Optionen PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

Ja

Kafka SASL Username

Der Benutzername für die Authentifizierung bei Kafka

Ja

Kafka SASL Password

Das Kennwort für die Authentifizierung bei Kafka

Ja

Kafka Bootstrap Servers

Eine durch Kommas getrennte Liste des Kafka-Broker, von dem Daten abgerufen werden sollen. Sie sollte den Port enthalten, zum Beispiel „kafka-broker:9092“. Dieselbe Instanz wird für das DLQ-Thema verwendet.

Ja

Aufnahmeparameter für Kafka

Parameter

Beschreibung

Erforderlich

Kafka Topic Format

Eine der folgenden Optionen: Namen / Muster. Gibt an, ob es sich bei den angegebenen „Kafka Topics“ um eine durch Kommas getrennte Liste von Namen oder um einen einzelnen regulären Ausdruck handelt.

Ja

Kafka Topics

Eine durch Kommas getrennte Liste von Kafka-Themen oder ein regulärer Ausdruck.

Ja

Kafka Group Id

Die ID einer Verbrauchergruppe, die vom Konnektor verwendet wird. Kann beliebig gewählt werden, muss jedoch eindeutig sein.

Ja

Kafka Auto Offset Reset

Automatische Offset-Konfiguration, wenn kein vorheriger Verbraucher-Offset gefunden wird, der der Kafka-Eigenschaft auto.offset.reset entspricht. Eine der folgenden Optionen: früheste / späteste. Voreinstellung: latest

Ja

Topic To Table Map

Mit diesem optionalen Parameter kann der Benutzer festlegen, welche Themen welchen Tabellen zugeordnet werden sollen. Jedes Thema und sein Tabellenname müssen durch einen Doppelpunkt getrennt werden. Dieser Tabellenname muss ein gültiger Snowflake-Bezeichner ohne Anführungszeichen sein. Die regulären Ausdrücke dürfen nicht mehrdeutig sein – jedes übereinstimmende Thema darf nur mit einer einzigen Zieltabelle übereinstimmen. Wenn leer oder keine Treffer gefunden wurden, wird der Name des Themas als Tabellenname verwendet. Hinweis: Die Zuordnung darf keine Leerzeichen nach Kommas enthalten.

Nein

Topic To Table Map Beispielwerte:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - maps all topics to the destination_table

Konfiguration variantenspezifischer Einstellungen

Nachdem Sie die allgemeinen Parameter konfiguriert haben, müssen Sie die spezifischen Einstellungen für Ihre gewählte Konnektorvariante konfigurieren:

Für die Konnektoren Apache Kafka für JSON-Datenformat und Apache Kafka für AVRO-Datenformat:

Siehe Apache Kafka für JSON/AVRO-Datenformat für JSON-/AVRO-spezifische Parameter.

Für Apache Kafka mit DLQ und Metadaten-Konnektor:

Siehe Apache Kafka mit DLQ und Metadaten für erweiterte Parameter, einschließlich der DLQ-Konfiguration, Schematisierungseinstellungen, Unterstützung von Iceberg-Tabellen und Optionen für das Meldungsformat.

Authentifizierung

Alle Konnektorvarianten unterstützen die SASL-Authentifizierung, die über Parameterkontexte konfiguriert wird, wie in Kafka-Quellparameter (SASL-Authentifizierung) beschrieben.

Für andere Authentifizierungsmethoden, einschließlich mTLS und AWS MSK IAM, siehe Andere Authentifizierungsmethoden für Openflow Connector für Kafka konfigurieren.

Führen Sie den Ablauf aus

  1. Klicken Sie mit der rechten Maustaste auf die Ebene, und klicken Sie dann auf Enable all Controller Services.

  2. Klicken Sie mit der rechten Maustaste auf die Ebene und klicken Sie auf Start. Der Konnektor beginnt mit der Datenaufnahme.