Verwendung des Snowflake Connector für Kafka mit Apache Iceberg™-Tabellen¶
Ab Version 3.0.0 kann der Snowflake Connector für Kafka Daten in eine von Snowflake verwaltete Apache Iceberg™-Tabelle aufnehmen.
Anforderungen und Einschränkungen¶
Bevor Sie den Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Einschränkungen:
Die Aufnahme von Iceberg-Tabellen erfordert Version 3.0.0 oder höher des Kafka-Konnektors.
Die Aufnahme von Iceberg-Tabellen wird durch den Kafka-Konnektor mit Snowpipe Streaming unterstützt. Sie wird vom Kafka-Konnektor mit Snowpipe nicht unterstützt.
Das Einlesen von Iceberg-Tabellen wird nicht unterstützt, wenn
snowflake.streaming.enable.single.buffer
auffalse
gesetzt ist.Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen. Weitere Informationen finden Sie unter Konfiguration und Einrichtung in diesem Thema.
Einschränkungen der Schema-Entwicklung¶
Die Schemaentwicklung für Iceberg wird für schematisierte Datenformate wie AVRO oder Protobuf vollständig unterstützt.
Bei einfachem JSON ohne Schema betrachtet der Konnektor die folgenden Meldungstypen als ungültig und sendet sie an Dead-Letter-Queues (DLQ):
Meldungen mit einer neuen Spalte, wenn der entsprechende Wert
null
oderist []
Meldungen mit einem neuen Feld in einem strukturierten Objekt, wenn der entsprechende Wert
null
oderist []
Um das Tabellenschema manuell zu ändern, damit der Konnektor diese Meldungstypen aufnehmen kann, verwenden Sie eine ALTER TABLE-Anweisung.
Konfiguration und Einrichtung¶
Um den Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den regulären Einrichtungsschritten für einen auf Snowpipe Streaming basierenden Konnektor mit ein paar Unterschieden, die in den folgenden Abschnitten beschrieben werden.
Verwendung auf einem externen Volume gewähren¶
Sie müssen Ihrer Rolle für den Kafka-Konnektor die Berechtigung USAGE auf dem externen Volume gewähren, das mit Ihrer Iceberg-Tabelle verbunden ist.
Wenn Ihre Iceberg-Tabelle beispielsweise das externe Volume kafka_external_volume
verwendet und der Konnektor die Rolle kafka_connector_role
benutzt, führen Sie die folgende Anweisung aus:
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme¶
Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von den Einstellungen snowflake.enable.schematization
Ihres Konnektors ab.
Wenn Sie die Schematisierung aktivieren, können Sie eine Tabelle mit einer Spalte mit dem Namen record_metadata
erstellen:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
record_metadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Der Konnektor erstellt automatisch die Spalte record_content
und ändert das Spaltenschema record_metadata
.
Wenn Sie die Schematisierung nicht aktivieren, können Sie eine Tabelle mit einer Spalte mit dem Namen record_content
eines Typs erstellen, der dem tatsächlichen Inhalt der Kafka-Meldung entspricht. Der Konnektor erstellt automatisch die Spalte record_metadata
.
Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.
Betrachten Sie zum Beispiel die folgende Meldung:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Um eine Iceberg-Tabelle für die Beispielmeldung zu erstellen, verwenden Sie die folgende Anweisung:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( record_content OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Bemerkung
Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs
oder cats
wird zwischen Groß- und Kleinschreibung unterschieden.
Konfigurationseigenschaften¶
snowflake.streaming.iceberg.enabled
Gibt an, ob der Konnektor Daten in eine Iceberg-Tabelle aufnimmt. Der Konnektor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt.
- Werte:
true
false
- Standard:
false