Apache Kafka mit DLQ und Metadaten

Bemerkung

Der Konnektor unterliegt den Bedingungen für Konnektoren.

Unter diesem Thema wird Apache Kafka mit DLQ und Metadaten-Konnektor beschrieben. Dies ist der voll funktionsfähige Konnektor, der die gleiche Funktionalität wie der ältere Snowflake-Konnektor für Kafka bietet und erweiterte Funktionen für Produktionsanwendungen enthält.

Wichtige Funktionen

Der Apache Kafka mit DLQ und Metadaten-Konnektor bietet umfassende Funktionen:

  • Dead Letter Queue (DLQ)-Unterstützung für die Behandlung fehlgeschlagener Meldungen

  • RECORD_METADATA-Spalte mit den Metadaten der Kafka-Meldung

  • Konfigurierbare Schematisierung – Schemaerkennung aktivieren oder deaktivieren

  • Unterstützung von Iceberg-Tabellen mit Schemaentwicklung

  • Mehrere Meldungsformate – Unterstützung von JSON und AVRO

  • Integration der Schema-Registry für AVRO-Meldungen

  • Thema-zu-Tabelle-Zuordnung mit erweiterten Mustern

  • Unterstützung von SASL-Authentifizierung

Spezifische Parameter

Zusätzlich zu den allgemeinen Parametern, die unter Einrichten von Openflow Connector für Kafka beschrieben sind, enthält dieser Konnektor zusätzliche Parameterkontexte für erweiterte Features.

Meldungsformat und Schemaparameter

Parameter

Beschreibung

Erforderlich

Message Format

Das Format der Meldungen in Kafka. Eine der folgenden Optionen: JSON / AVRO. Standard: JSON

Ja

Schema AVRO

Avro-Schema für den Fall, dass schema-text-property in „AVRO Schema Access Strategy“ mit dem AVRO-Meldungsformat verwendet wird. Hinweis: Dies sollte nur verwendet werden, wenn alle Meldungen, die von dem/den konfigurierten Kafka-Thema/Themen konsumiert werden, dasselbe Schema haben.

Nein

AVRO Schema Access Strategy

Die Methode für den Zugriff auf das AVRO-Schema einer Meldung. Erforderlich für AVRO. Eine der folgenden Optionen: embedded-avro-schema / schema-reference-reader / schema-text-property. Standard: embedded-avro-schema

Nein

Parameter für die Schema Registry

Parameter

Beschreibung

Erforderlich

Schema Registry Authentication Type

Die Methode zur Authentifizierung bei der Schema-Registry, falls verwendet. Andernfalls verwenden Sie NONE. Eine der folgenden Optionen: NONE / BASIC. Standard: NONE

Ja

Schema Registry URL

Die URL der Schema Registry. Erforderlich für das AVRO-Meldungsformat.

Nein

Schema Registry Username

Der Benutzername für die Schema Registry. Erforderlich für das AVRO-Meldungsformat.

Nein

Schema Registry Password

Das Kennwort für die Schema Registry. Erforderlich für das AVRO-Meldungsformat.

Nein

Parameter für DLQ und erweiterte Features

Parameter

Beschreibung

Erforderlich

Kafka-DLQThema

DLQ-Thema zum Senden von Meldungen mit Parsing-Fehlern an

Ja

Schematization Enabled

Legt fest, ob die Daten in einzelne Spalten oder in ein einzelnes RECORD_CONTENT-Feld eingefügt werden. Eine der folgenden Optionen: true / false. Standard: true

Ja

Iceberg Enabled

Gibt an, ob der Prozessor Daten in eine Iceberg-Tabelle aufnimmt. Der Prozessor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt. Standard: false

Ja

Schematization behavior

Das Verhalten des Konnektors ändert sich gemäß dem Parameter Schematization Enabled:

Schematization enabled

Wenn Schematisierung aktiviert ist, wird der Konnektor:

  • Erzeugt einzelne Spalten für jedes Feld in der Meldung

  • Enthält eine RECORD_METADATA-Spalte mit Kafka-Metadaten.

  • Entwickelt das Tabellenschema automatisch weiter, wenn neue Felder entdeckt werden.

  • Vereinfacht verschachtelte JSON/AVRO-Strukturen in separate Spalten.

Beispiel einer Tabellenstruktur:

Zeile

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{„timestamp“:1669074170090, „headers“: {„current.iter…

ABC123

ZTEST

BUY

3572

2

{„timestamp“:1669074170400, „headers“: {„current.iter…

XYZ789

ZABX

SELL

3024

Schematization disabled

Wenn die Schematisierung deaktiviert ist, wird der Konnektor:

  • nur zwei Spalten erzeugen: RECORD_CONTENT und RECORD_METADATA

  • den gesamten Inhalt der Meldung als OBJECT in RECORD_CONTENT speichern

  • keine automatische Schemaentwicklung durchführen

  • maximale Flexibilität für die nachgelagerte Verarbeitung bieten

Beispiel einer Tabellenstruktur:

Zeile

RECORD_METADATA

RECORD_CONTENT

1

{„timestamp“:1669074170090, „headers“: {„current.iter…

{„account“: „ABC123“, „symbol“: „ZTEST“, „side“:…

2

{„timestamp“:1669074170400, „headers“: {„current.iter…

{„account“: „XYZ789“, „symbol“: „ZABX“, „side“:…

Verwenden Sie die Eigenschaft Schematization Enabled in den Konfigurationseigenschaften des Konnektors, um die Schemaerkennung zu aktivieren oder zu deaktivieren.

Schemaerkennung und -entwicklung

Der Konnektor unterstützt die Schemaerkennung und -entwicklung. Die Struktur der Tabellen in Snowflake kann automatisch definiert und weiterentwickelt werden, um die Struktur der vom Konnektor geladenen neuen Daten zu unterstützen.

Ohne Schemaerkennung und -entwicklung besteht die vom Konnektor geladene Snowflake-Tabelle nur aus zwei OBJECT Spalten: RECORD_CONTENT und RECORD_METADATA.

Wenn Schemaerkennung und Schemaentwicklung aktiviert sind, kann Snowflake das Schema der Streaming-Daten erkennen und Daten in Tabellen laden, die automatisch mit jedem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL-Einschränkung von Spalten, die in den neuen Datendateien fehlen.

Die Schemaerkennung mit dem Konnektor wird mit oder ohne eine bereitgestellte Schema-Registry unterstützt. Bei Verwendung einer Schema-Registrierung (Avro) wird die Spalte mit den in der bereitgestellten Schema-Registrierung definierten Datentypen erstellt. Wenn es kein Schemaregister (JSON) gibt, wird der Datentyp von den bereitgestellten Daten abgeleitet.

JSON ARRAY wird für weitere Schematisierungen nicht unterstützt.

Aktivieren der Schema-Entwicklung

Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.

Wenn Sie die Schemaentwicklung für eine bestehende Tabelle aktivieren oder deaktivieren möchten, verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION festzulegen. Sie müssen außerdem eine Rolle verwenden, die über die OWNERSHIP-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.

Wenn jedoch die Schemaentwicklung für eine vorhandene Tabelle deaktiviert ist, versucht der Konnektor, die Zeilen mit nicht übereinstimmenden Schemata an die konfigurierten Dead-Letter-Queues (DLQ) zu senden.

RECORD_METADATA-Struktur

Die Spalte RECORD_METADATA enthält wichtige Metadaten zu Kafka-Meldungen:

Feld

Beschreibung

offset

Das Meldungs-Offset innerhalb der Kafka-Partition

topic

Der Name des Kafka-Themas

partition

Die Kafka-Partitionsnummer

key

Der Meldungsschlüssel (falls vorhanden)

Zeitstempel

Der Zeitstempel der Meldung

SnowflakeConnectorPushTime

Zeitstempel für den Zeitpunkt, an dem der Konnektor die Meldung von Kafka abgerufen hat

headers

Zuordnung der Meldungsheader (falls vorhanden)

Dead Letter Queue (DLQ)

Die DLQ-Funktion behandelt Meldungen, die nicht erfolgreich verarbeitet werden können:

DLQ-Verhalten

  • Parse-Fehler – Meldungen mit ungültigem JSON/AVRO-Format werden an die DLQ gesendet.

  • Schemaabweichungen – Meldungen, die nicht mit dem erwarteten Schema übereinstimmen, wenn die Schemaentwicklung deaktiviert ist

  • Verarbeitungsfehler – Andere Verarbeitungsfehler während der Datenaufnahme

Unterstützung von Iceberg-Tabelle

Openflow Connector für Kafka kann Daten in eine von Snowflake verwaltete Apache Iceberg™-Tabelle aufnehmen, wenn Iceberg Enabled auf true gesetzt ist.

Anforderungen und Einschränkungen

Bevor Sie den Openflow Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Beschränkungen:

  • Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.

  • Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.

Konfiguration und Einrichtung

Um den Openflow Connector für Kafka für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den Schritten unter Einrichten von Openflow Connector für Kafka mit einigen Abweichungen, die in den folgenden Abschnitten erläutert werden.

Aufnahme in die Iceberg-Tabellen aktivieren

Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled auf true setzen.

Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme

Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von den Einstellungen der Eigenschaft Schematization Enabled Ihres Konnektors ab.

Wenn Sie die Schematisierung aktivieren, müssen Sie eine Tabelle mit einer Spalte namens record_metadata erstellen:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Der Konnektor erstellt automatisch die Spalten für Meldungsfelder und ändert das Spaltenschema record_metadata.

Wenn Sie die Schematisierung nicht aktivieren, müssen Sie eine Tabelle mit einer Spalte namens record_content eines Typs erstellen, der dem tatsächlichen Inhalt der Kafka-Meldung entspricht. Der Konnektor erstellt automatisch die Spalte record_metadata.

Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Beispiele für die Erstellung von Iceberg-Tabellen

Mit aktivierter Schematisierung:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Mit deaktivierter Schematisierung:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Bemerkung

RECORD_METADATA muss immer erstellt werden. Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs oder cats wird zwischen Groß- und Kleinschreibung unterschieden.

Anwendungsfälle

Dieser Konnektor ist ideal für:

  • Produktionsumgebungen, die DLQ erfordern

  • Datenherkunft und -prüfung, wo Kafka-Metadaten wichtig sind

  • Komplexe Meldungsverarbeitung mit Anforderungen an die Schemaentwicklung

  • Integration von Iceberg-Tabellen

Wenn Sie eine einfachere Datenaufnahme ohne Metadaten oder DLQ-Features benötigen, sollten Sie stattdessen die Apache Kafka für JSON/AVRO-Datenformat-Konnektoren in Betracht ziehen.