Verwendung des Snowflake Connector für Kafka mit Apache Iceberg™-Tabellen

Ab Version 3.0.0 kann der Snowflake Connector für Kafka Daten in eine von Snowflake verwaltete Apache Iceberg™-Tabelle aufnehmen.

Anforderungen und Einschränkungen

Bevor Sie den Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen konfigurieren, beachten Sie die folgenden Anforderungen und Einschränkungen:

  • Die Aufnahme von Iceberg-Tabellen erfordert Version 3.0.0 oder höher des Kafka-Konnektors.

  • Die Aufnahme von Iceberg-Tabellen wird durch den Kafka-Konnektor mit Snowpipe Streaming unterstützt. Sie wird vom Kafka-Konnektor mit Snowpipe nicht unterstützt.

  • Das Einlesen von Iceberg-Tabellen wird nicht unterstützt, wenn snowflake.streaming.enable.single.buffer auf false gesetzt ist.

  • Sie müssen eine Iceberg-Tabelle erstellen, bevor Sie den Konnektor ausführen. Weitere Informationen finden Sie unter Konfiguration und Einrichtung in diesem Thema.

Einschränkungen der Schema-Entwicklung

Die Schemaentwicklung für Iceberg wird für schematisierte Datenformate wie AVRO oder Protobuf vollständig unterstützt.

Bei einfachem JSON ohne Schema betrachtet der Konnektor die folgenden Meldungstypen als ungültig und sendet sie an Dead-Letter-Queues (DLQ):

  • Meldungen mit einer neuen Spalte, wenn der entsprechende Wert null oder ist []

  • Meldungen mit einem neuen Feld in einem strukturierten Objekt, wenn der entsprechende Wert null oder ist []

Um das Tabellenschema manuell zu ändern, damit der Konnektor diese Meldungstypen aufnehmen kann, verwenden Sie eine ALTER TABLE-Anweisung.

Konfiguration und Einrichtung

Um den Kafka-Konnektor für die Aufnahme von Iceberg-Tabellen zu konfigurieren, folgen Sie den regulären Einrichtungsschritten für einen auf Snowpipe Streaming basierenden Konnektor mit ein paar Unterschieden, die in den folgenden Abschnitten beschrieben werden.

Verwendung auf einem externen Volume gewähren

Sie müssen Ihrer Rolle für den Kafka-Konnektor die Berechtigung USAGE auf dem externen Volume gewähren, das mit Ihrer Iceberg-Tabelle verbunden ist.

Wenn Ihre Iceberg-Tabelle beispielsweise das externe Volume kafka_external_volume verwendet und der Konnektor die Rolle kafka_connector_role benutzt, führen Sie die folgende Anweisung aus:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

Erstellen Sie eine Iceberg-Tabelle für die Datenaufnahme

Bevor Sie den Konnektorr ausführen, müssen Sie eine Iceberg-Tabelle erstellen. Das anfängliche Tabellenschema hängt von den Einstellungen snowflake.enable.schematization Ihres Konnektors ab.

Wenn Sie die Schematisierung aktivieren, können Sie eine Tabelle mit einer Spalte mit dem Namen record_metadata erstellen:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Der Konnektor erstellt automatisch die Spalte record_content und ändert das Spaltenschema record_metadata.

Wenn Sie die Schematisierung nicht aktivieren, können Sie eine Tabelle mit einer Spalte mit dem Namen record_content eines Typs erstellen, der dem tatsächlichen Inhalt der Kafka-Meldung entspricht. Der Konnektor erstellt automatisch die Spalte record_metadata.

Wenn Sie eine Iceberg-Tabelle erstellen, können Sie Iceberg-Datentypen oder kompatible Snowflake-Typen verwenden. Der halbstrukturierte VARIANT-Typ wird nicht unterstützt. Verwenden Sie stattdessen eine strukturierte OBJECT oder MAP.

Betrachten Sie zum Beispiel die folgende Meldung:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Um eine Iceberg-Tabelle für die Beispielmeldung zu erstellen, verwenden Sie die folgende Anweisung:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Bemerkung

Bei Feldnamen innerhalb verschachtelter Strukturen wie dogs oder cats wird zwischen Groß- und Kleinschreibung unterschieden.

Konfigurationseigenschaften

snowflake.streaming.iceberg.enabled

Gibt an, ob der Konnektor Daten in eine Iceberg-Tabelle aufnimmt. Der Konnektor schlägt fehl, wenn diese Eigenschaft nicht mit dem tatsächlichen Tabellentyp übereinstimmt.

Werte:

  • true

  • false

Standard:

false