Apache Kafka mit DLQ und Metadaten¶
Bemerkung
Der Konnektor unterliegt den Bedingungen für Konnektoren.
Unter diesem Thema wird Apache Kafka mit DLQ und Metadaten-Konnektor beschrieben. Dies ist der voll funktionsfähige Konnektor, der die gleiche Funktionalität wie der ältere Snowflake-Konnektor für Kafka bietet und erweiterte Funktionen für Produktionsanwendungen enthält.
Wichtige Funktionen¶
Der Apache Kafka mit DLQ und Metadaten-Konnektor bietet umfassende Funktionen:
Dead Letter Queue (DLQ)-Unterstützung für die Behandlung fehlgeschlagener Meldungen
RECORD_METADATA-Spalte mit den Metadaten der Kafka-Meldung
Konfigurierbare Schematisierung – Schemaerkennung aktivieren oder deaktivieren
Unterstützung von Iceberg-Tabellen mit Schemaentwicklung
Mehrere Meldungsformate – Unterstützung von JSON und AVRO
Integration der Schema-Registry für AVRO-Meldungen
Thema-zu-Tabelle-Zuordnung mit erweiterten Mustern
Unterstützung von SASL-Authentifizierung
Spezifische Parameter¶
Zusätzlich zu den allgemeinen Parametern, die unter Einrichten von Openflow Connector für Kafka beschrieben sind, enthält dieser Konnektor zusätzliche Parameterkontexte für erweiterte Features.
Meldungsformat und Schemaparameter¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Message Format |
Das Format der Meldungen in Kafka. Eine der folgenden Optionen: JSON / AVRO. Standard: JSON |
Ja |
Schema AVRO |
Avro-Schema für den Fall, dass schema-text-property in „AVRO Schema Access Strategy“ mit dem AVRO-Meldungsformat verwendet wird. Hinweis: Dies sollte nur verwendet werden, wenn alle Meldungen, die von dem/den konfigurierten Kafka-Thema/Themen konsumiert werden, dasselbe Schema haben. |
Nein |
AVRO Schema Access Strategy |
Die Methode für den Zugriff auf das AVRO-Schema einer Meldung. Erforderlich für AVRO. Eine der folgenden Optionen: embedded-avro-schema / schema-reference-reader / schema-text-property. Standard: embedded-avro-schema |
Nein |
Parameter für die Schema Registry¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Schema Registry Authentication Type |
Die Methode zur Authentifizierung bei der Schema-Registry, falls verwendet. Andernfalls verwenden Sie NONE. Eine der folgenden Optionen: NONE / BASIC. Standard: NONE |
Ja |
Schema Registry URL |
Die URL der Schema Registry. Erforderlich für das AVRO-Meldungsformat. |
Nein |
Schema Registry Username |
Der Benutzername für die Schema Registry. Erforderlich für das AVRO-Meldungsformat. |
Nein |
Schema Registry Password |
Das Kennwort für die Schema Registry. Erforderlich für das AVRO-Meldungsformat. |
Nein |
Parameter für DLQ und erweiterte Features¶
Parameter |
Beschreibung |
Erforderlich |
---|---|---|
Kafka-DLQThema |
DLQ-Thema zum Senden von Meldungen mit Parsing-Fehlern an |
Ja |
Schematization Enabled |
Legt fest, ob die Daten in einzelne Spalten oder in ein einzelnes RECORD_CONTENT-Feld eingefügt werden. Eine der folgenden Optionen: true / false. Standard: true |
Ja |
Iceberg Enabled |
Gibt an, ob der Prozessor Daten in eine Iceberg-Tabelle aufnimmt. Der Prozessor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt. Standard: false |
Ja |
Schematization behavior¶
Das Verhalten des Konnektors ändert sich gemäß dem Parameter Schematization Enabled:
Schematization enabled¶
Wenn Schematisierung aktiviert ist, wird der Konnektor:
Erzeugt einzelne Spalten für jedes Feld in der Meldung
Enthält eine RECORD_METADATA-Spalte mit Kafka-Metadaten.
Entwickelt das Tabellenschema automatisch weiter, wenn neue Felder entdeckt werden.
Vereinfacht verschachtelte JSON/AVRO-Strukturen in separate Spalten.
Beispiel einer Tabellenstruktur:
Zeile |
RECORD_METADATA |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|---|
1 |
{„timestamp“:1669074170090, „headers“: {„current.iter… |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
{„timestamp“:1669074170400, „headers“: {„current.iter… |
XYZ789 |
ZABX |
SELL |
3024 |
Schematization disabled¶
Wenn die Schematisierung deaktiviert ist, wird der Konnektor:
nur zwei Spalten erzeugen: RECORD_CONTENT und RECORD_METADATA
den gesamten Inhalt der Meldung als OBJECT in RECORD_CONTENT speichern
keine automatische Schemaentwicklung durchführen
maximale Flexibilität für die nachgelagerte Verarbeitung bieten
Beispiel einer Tabellenstruktur:
Zeile |
RECORD_METADATA |
RECORD_CONTENT |
---|---|---|
1 |
{„timestamp“:1669074170090, „headers“: {„current.iter… |
{„account“: „ABC123“, „symbol“: „ZTEST“, „side“:… |
2 |
{„timestamp“:1669074170400, „headers“: {„current.iter… |
{„account“: „XYZ789“, „symbol“: „ZABX“, „side“:… |
Verwenden Sie die Eigenschaft Schematization Enabled
in den Konfigurationseigenschaften des Konnektors, um die Schemaerkennung zu aktivieren oder zu deaktivieren.
Schemaerkennung und -entwicklung¶
Der Konnektor unterstützt die Schemaerkennung und -entwicklung. Die Struktur der Tabellen in Snowflake kann automatisch definiert und weiterentwickelt werden, um die Struktur der vom Konnektor geladenen neuen Daten zu unterstützen.
Ohne Schemaerkennung und -entwicklung besteht die vom Konnektor geladene Snowflake-Tabelle nur aus zwei OBJECT
Spalten: RECORD_CONTENT
und RECORD_METADATA
.
Wenn Schemaerkennung und Schemaentwicklung aktiviert sind, kann Snowflake das Schema der Streaming-Daten erkennen und Daten in Tabellen laden, die automatisch mit jedem benutzerdefinierten Schema übereinstimmen. Snowflake ermöglicht auch das Hinzufügen von neuen Spalten oder das Löschen der NOT NULL
-Einschränkung von Spalten, die in den neuen Datendateien fehlen.
Die Schemaerkennung mit dem Konnektor wird mit oder ohne eine bereitgestellte Schema-Registry unterstützt. Bei Verwendung einer Schema-Registrierung (Avro) wird die Spalte mit den in der bereitgestellten Schema-Registrierung definierten Datentypen erstellt. Wenn es kein Schemaregister (JSON) gibt, wird der Datentyp von den bereitgestellten Daten abgeleitet.
JSON ARRAY wird für weitere Schematisierungen nicht unterstützt.
Aktivieren der Schema-Entwicklung¶
Wenn der Konnektor die Zieltabelle erstellt, ist die Schemaentwicklung standardmäßig aktiviert.
Wenn Sie die Schemaentwicklung für eine bestehende Tabelle aktivieren oder deaktivieren möchten, verwenden Sie den Befehl ALTER TABLE, um den Parameter ENABLE_SCHEMA_EVOLUTION
festzulegen. Sie müssen außerdem eine Rolle verwenden, die über die OWNERSHIP
-Berechtigung für die Tabelle verfügt. Weitere Informationen dazu finden Sie unter Tabellenschemaentwicklung.
Wenn jedoch die Schemaentwicklung für eine vorhandene Tabelle deaktiviert ist, versucht der Konnektor, die Zeilen mit nicht übereinstimmenden Schemata an die konfigurierten Dead-Letter-Queues (DLQ) zu senden.
RECORD_METADATA-Struktur¶
Die Spalte RECORD_METADATA enthält wichtige Metadaten zu Kafka-Meldungen:
Feld |
Beschreibung |
---|---|
offset |
Das Meldungs-Offset innerhalb der Kafka-Partition |
topic |
Der Name des Kafka-Themas |
partition |
Die Kafka-Partitionsnummer |
key |
Der Meldungsschlüssel (falls vorhanden) |
Zeitstempel |
Der Zeitstempel der Meldung |
SnowflakeConnectorPushTime |
Zeitstempel für den Zeitpunkt, an dem der Konnektor die Meldung von Kafka abgerufen hat |
headers |
Zuordnung der Meldungsheader (falls vorhanden) |
Dead Letter Queue (DLQ)¶
Die DLQ-Funktion behandelt Meldungen, die nicht erfolgreich verarbeitet werden können:
DLQ-Verhalten¶
Parse-Fehler – Meldungen mit ungültigem JSON/AVRO-Format werden an die DLQ gesendet.
Schemaabweichungen – Meldungen, die nicht mit dem erwarteten Schema übereinstimmen, wenn die Schemaentwicklung deaktiviert ist
Verarbeitungsfehler – Andere Verarbeitungsfehler während der Datenaufnahme
Unterstützung von Iceberg-Tabelle¶
Openflow Connector für Kafka kann Daten in eine von Snowflake verwaltete Apache Iceberg™-Tabelle aufnehmen, wenn Iceberg Enabled auf true gesetzt ist.
Anforderungen und Einschränkungen¶
Bevor Sie den Openflow Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Beschränkungen:
Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen.
Stellen Sie sicher, dass der Benutzer Daten in die erstellten Tabellen eingeben kann.
Konfiguration und Einrichtung¶
Um den Openflow Connector für Kafka für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den Schritten unter Einrichten von Openflow Connector für Kafka mit einigen Abweichungen, die in den folgenden Abschnitten erläutert werden.
Aufnahme in die Iceberg-Tabellen aktivieren¶
Um die Aufnahme in eine Iceberg-Tabelle zu ermöglichen, müssen Sie den Parameter Iceberg Enabled
auf true
setzen.
Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme¶
Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von den Einstellungen der Eigenschaft Schematization Enabled
Ihres Konnektors ab.
Wenn Sie die Schematisierung aktivieren, müssen Sie eine Tabelle mit einer Spalte namens record_metadata
erstellen:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Der Konnektor erstellt automatisch die Spalten für Meldungsfelder und ändert das Spaltenschema record_metadata
.
Wenn Sie die Schematisierung nicht aktivieren, müssen Sie eine Tabelle mit einer Spalte namens record_content
eines Typs erstellen, der dem tatsächlichen Inhalt der Kafka-Meldung entspricht. Der Konnektor erstellt automatisch die Spalte record_metadata
.
Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.
Betrachten Sie zum Beispiel die folgende Meldung:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Beispiele für die Erstellung von Iceberg-Tabellen¶
Mit aktivierter Schematisierung:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Mit deaktivierter Schematisierung:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
RECORD_METADATA OBJECT(
offset INTEGER,
topic STRING,
partition INTEGER,
key STRING,
timestamp TIMESTAMP,
SnowflakeConnectorPushTime BIGINT,
headers MAP(VARCHAR, VARCHAR)
),
RECORD_CONTENT OBJECT(
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Bemerkung
RECORD_METADATA muss immer erstellt werden. Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs
oder cats
wird zwischen Groß- und Kleinschreibung unterschieden.
Anwendungsfälle¶
Dieser Konnektor ist ideal für:
Produktionsumgebungen, die DLQ erfordern
Datenherkunft und -prüfung, wo Kafka-Metadaten wichtig sind
Komplexe Meldungsverarbeitung mit Anforderungen an die Schemaentwicklung
Integration von Iceberg-Tabellen
Wenn Sie eine einfachere Datenaufnahme ohne Metadaten oder DLQ-Features benötigen, sollten Sie stattdessen die Apache Kafka für JSON/AVRO-Datenformat-Konnektoren in Betracht ziehen.